/build/source/nativelink-store/src/fast_slow_store.rs
Line | Count | Source |
1 | | // Copyright 2024-2025 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::borrow::BorrowMut; |
16 | | use core::cmp::{max, min}; |
17 | | use core::ops::Range; |
18 | | use core::pin::Pin; |
19 | | use core::sync::atomic::{AtomicU64, Ordering}; |
20 | | use core::time::Duration; |
21 | | use std::collections::HashMap; |
22 | | use std::ffi::OsString; |
23 | | use std::sync::{Arc, Weak}; |
24 | | |
25 | | use async_trait::async_trait; |
26 | | use futures::stream::{FuturesUnordered, StreamExt}; |
27 | | use futures::{FutureExt, join, try_join}; |
28 | | use nativelink_config::stores::{FastSlowSpec, StoreDirection}; |
29 | | use nativelink_error::{Code, Error, ErrorContext, ResultExt, make_err}; |
30 | | use nativelink_metric::MetricsComponent; |
31 | | use nativelink_util::buf_channel::{ |
32 | | DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair, |
33 | | }; |
34 | | use nativelink_util::fs; |
35 | | use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator}; |
36 | | use nativelink_util::store_trait::{ |
37 | | RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, StoreOptimizations, |
38 | | UploadSizeInfo, slow_update_store_with_file, |
39 | | }; |
40 | | use parking_lot::Mutex; |
41 | | use tokio::sync::OnceCell; |
42 | | use tracing::{debug, info, trace, warn}; |
43 | | |
44 | | // TODO(palfrey) This store needs to be evaluated for more efficient memory usage, |
45 | | // there are many copies happening internally. |
46 | | |
47 | | type Loader = Arc<OnceCell<()>>; |
48 | | type MaybeSize = Option<u64>; |
49 | | |
50 | | // TODO(palfrey) We should consider copying the data in the background to allow the |
51 | | // client to hang up while the data is buffered. An alternative is to possibly make a |
52 | | // "BufferedStore" that could be placed on the "slow" store that would hang up early |
53 | | // if data is in the buffer. |
54 | | #[derive(Debug, MetricsComponent)] |
55 | | pub struct FastSlowStore { |
56 | | #[metric(group = "fast_store")] |
57 | | fast_store: Store, |
58 | | fast_direction: StoreDirection, |
59 | | #[metric(group = "slow_store")] |
60 | | slow_store: Store, |
61 | | slow_direction: StoreDirection, |
62 | | /// See [`FastSlowSpec::bypass_dedup_threshold_bytes`]. |
63 | | bypass_dedup_threshold_bytes: u64, |
64 | | weak_self: Weak<Self>, |
65 | | #[metric] |
66 | | metrics: FastSlowStoreMetrics, |
67 | | // De-duplicate requests for the fast store, only the first streams, others |
68 | | // are blocked. This may feel like it's causing a slow down of tasks, but |
69 | | // actually it's faster because we're not downloading the file multiple |
70 | | // times are doing loads of duplicate IO. |
71 | | populating_digests: Mutex<HashMap<StoreKey<'static>, Loader>>, |
72 | | // Tracks keys whose slow-store write is currently in flight, along with |
73 | | // the best-known size. Consulted by `has_with_results` so that a |
74 | | // concurrent writer that has not yet finished pushing to the slow store |
75 | | // is still visible to a concurrent existence check, preventing redundant |
76 | | // duplicate uploads of the same blob. |
77 | | in_flight_slow_writes: Mutex<HashMap<StoreKey<'static>, Arc<tokio::sync::Mutex<MaybeSize>>>>, |
78 | | } |
79 | | |
80 | | // This guard ensures that the populating_digests is cleared even if the future |
81 | | // is dropped, it is cancel safe. |
82 | | struct LoaderGuard<'a> { |
83 | | weak_store: Weak<FastSlowStore>, |
84 | | key: StoreKey<'a>, |
85 | | loader: Option<Loader>, |
86 | | is_leader: bool, |
87 | | } |
88 | | |
89 | | impl LoaderGuard<'_> { |
90 | 75 | async fn get_or_try_init<E, F, Fut>(&self, f: F) -> Result<(), E> |
91 | 75 | where |
92 | 75 | F: FnOnce() -> Fut, |
93 | 75 | Fut: Future<Output = Result<(), E>>, |
94 | 75 | { |
95 | 75 | if let Some(loader) = &self.loader { |
96 | 75 | loader.get_or_try_init(f).await.map74 (|&()| ()) |
97 | | } else { |
98 | | // This is impossible, but we do it anyway. |
99 | 0 | f().await |
100 | | } |
101 | 74 | } |
102 | | } |
103 | | |
104 | | /// Cancel-safe RAII guard that removes an entry from |
105 | | /// `FastSlowStore::in_flight_slow_writes` when dropped. This ensures the |
106 | | /// map does not leak entries if the surrounding `update()` future is |
107 | | /// cancelled before the slow-store write completes. |
108 | | struct InFlightSlowWriteGuard { |
109 | | weak_store: Weak<FastSlowStore>, |
110 | | key: Option<StoreKey<'static>>, |
111 | | write_complete_guard: tokio::sync::OwnedMutexGuard<MaybeSize>, |
112 | | } |
113 | | |
114 | | impl InFlightSlowWriteGuard { |
115 | 130 | fn complete(mut self, size: MaybeSize) { |
116 | 130 | *self.write_complete_guard = size; |
117 | 130 | } |
118 | | } |
119 | | |
120 | | impl Drop for InFlightSlowWriteGuard { |
121 | 131 | fn drop(&mut self) { |
122 | 131 | let Some(store) = self.weak_store.upgrade() else { |
123 | 0 | return; |
124 | | }; |
125 | 131 | let Some(key) = self.key.take() else { |
126 | 0 | return; |
127 | | }; |
128 | 131 | store.in_flight_slow_writes.lock().remove(&key); |
129 | 131 | } |
130 | | } |
131 | | |
132 | | impl Drop for LoaderGuard<'_> { |
133 | 75 | fn drop(&mut self) { |
134 | 75 | let Some(store) = self.weak_store.upgrade() else { |
135 | | // The store has already gone away, nothing to remove from. |
136 | 0 | return; |
137 | | }; |
138 | 75 | let Some(loader) = self.loader.take() else { |
139 | | // This should never happen, but we do it to be safe. |
140 | 0 | return; |
141 | | }; |
142 | | |
143 | 75 | let mut guard = store.populating_digests.lock(); |
144 | 75 | if let std::collections::hash_map::Entry::Occupied(occupied_entry) = |
145 | 75 | guard.entry(self.key.borrow().into_owned()) |
146 | 75 | && Arc::ptr_eq(occupied_entry.get(), &loader) |
147 | | { |
148 | 75 | drop(loader); |
149 | 75 | if Arc::strong_count(occupied_entry.get()) == 1 { |
150 | 52 | // This is the last loader, so remove it. |
151 | 52 | occupied_entry.remove(); |
152 | 52 | }23 |
153 | 0 | } |
154 | 75 | } |
155 | | } |
156 | | |
157 | | impl FastSlowStore { |
158 | 75 | pub fn new(spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc<Self> { |
159 | 75 | Arc::new_cyclic(|weak_self| Self { |
160 | 75 | fast_store, |
161 | 75 | fast_direction: spec.fast_direction, |
162 | 75 | slow_store, |
163 | 75 | slow_direction: spec.slow_direction, |
164 | | // 0 (default) disables the bypass entirely (always dedup). |
165 | 75 | bypass_dedup_threshold_bytes: spec.bypass_dedup_threshold_bytes, |
166 | 75 | weak_self: weak_self.clone(), |
167 | 75 | metrics: FastSlowStoreMetrics::default(), |
168 | 75 | populating_digests: Mutex::new(HashMap::new()), |
169 | 75 | in_flight_slow_writes: Mutex::new(HashMap::new()), |
170 | 75 | }) |
171 | 75 | } |
172 | | |
173 | 131 | fn register_in_flight_slow_write(&self, key: StoreKey<'_>) -> InFlightSlowWriteGuard { |
174 | 131 | let owned = key.into_owned(); |
175 | 131 | let write_complete = Arc::new(tokio::sync::Mutex::new(None)); |
176 | 131 | let write_complete_guard = write_complete |
177 | 131 | .clone() |
178 | 131 | .try_lock_owned() |
179 | 131 | .expect("Newly created mutex is locked"); |
180 | 131 | self.in_flight_slow_writes |
181 | 131 | .lock() |
182 | 131 | .insert(owned.borrow().into_owned(), write_complete); |
183 | 131 | InFlightSlowWriteGuard { |
184 | 131 | weak_store: self.weak_self.clone(), |
185 | 131 | key: Some(owned), |
186 | 131 | write_complete_guard, |
187 | 131 | } |
188 | 131 | } |
189 | | |
190 | | /// Digest size in bytes, or `None` for non-digest keys. |
191 | 28 | const fn digest_size_bytes(key: &StoreKey<'_>) -> Option<u64> { |
192 | 28 | match key { |
193 | 28 | StoreKey::Digest(d) => Some(d.size_bytes()), |
194 | 0 | StoreKey::Str(_) => None, |
195 | | } |
196 | 28 | } |
197 | | |
198 | | /// Whether a read should skip dedup and hit the slow store directly. A |
199 | | /// threshold of 0 disables the bypass so every read goes through dedup. |
200 | 79 | fn should_bypass_dedup(&self, key: &StoreKey<'_>) -> bool { |
201 | 79 | self.bypass_dedup_threshold_bytes != 0 |
202 | 28 | && Self::digest_size_bytes(key) |
203 | 28 | .is_some_and(|size| size >= self.bypass_dedup_threshold_bytes) |
204 | 79 | } |
205 | | |
206 | 0 | pub const fn fast_store(&self) -> &Store { |
207 | 0 | &self.fast_store |
208 | 0 | } |
209 | | |
210 | 0 | pub const fn slow_store(&self) -> &Store { |
211 | 0 | &self.slow_store |
212 | 0 | } |
213 | | |
214 | 2 | pub fn get_arc(&self) -> Option<Arc<Self>> { |
215 | 2 | self.weak_self.upgrade() |
216 | 2 | } |
217 | | |
218 | 75 | fn get_loader<'a>(&self, key: StoreKey<'a>) -> LoaderGuard<'a> { |
219 | | // Get a single loader instance that's used to populate the fast store |
220 | | // for this digest. If another request comes in then it's de-duplicated. |
221 | 75 | let mut is_leader = false; |
222 | 75 | let loader = match self |
223 | 75 | .populating_digests |
224 | 75 | .lock() |
225 | 75 | .entry(key.borrow().into_owned()) |
226 | | { |
227 | 23 | std::collections::hash_map::Entry::Occupied(occupied_entry) => { |
228 | 23 | occupied_entry.get().clone() |
229 | | } |
230 | 52 | std::collections::hash_map::Entry::Vacant(vacant_entry) => { |
231 | 52 | is_leader = true; |
232 | 52 | vacant_entry.insert(Arc::new(OnceCell::new())).clone() |
233 | | } |
234 | | }; |
235 | 75 | LoaderGuard { |
236 | 75 | weak_store: self.weak_self.clone(), |
237 | 75 | key, |
238 | 75 | loader: Some(loader), |
239 | 75 | is_leader, |
240 | 75 | } |
241 | 75 | } |
242 | | |
243 | 52 | async fn populate_and_maybe_stream( |
244 | 52 | self: Pin<&Self>, |
245 | 52 | key: StoreKey<'_>, |
246 | 52 | maybe_writer: Option<&mut DropCloserWriteHalf>, |
247 | 52 | offset: u64, |
248 | 52 | length: Option<u64>, |
249 | 52 | ) -> Result<(), Error> { |
250 | 52 | let reader_stream_size51 = if self |
251 | 52 | .slow_store |
252 | 52 | .inner_store(Some(key.borrow())) |
253 | 52 | .optimized_for(StoreOptimizations::LazyExistenceOnSync) |
254 | | { |
255 | 2 | trace!( |
256 | | %key, |
257 | 2 | store_name = %self.slow_store.inner_store(Some(key.borrow())).get_name(), |
258 | | "Skipping .has() check due to LazyExistenceOnSync optimization" |
259 | | ); |
260 | 2 | UploadSizeInfo::MaxSize(u64::MAX) |
261 | | } else { |
262 | 50 | UploadSizeInfo::ExactSize(self |
263 | 50 | .slow_store |
264 | 50 | .has(key.borrow()) |
265 | 50 | .await |
266 | 50 | .err_tip(|| "Failed to run has() on slow store")?0 |
267 | 50 | .ok_or_else(|| {1 |
268 | 1 | let err = make_err!( |
269 | 1 | Code::NotFound, |
270 | | "Object {} not found in either fast or slow store. \ |
271 | | If using multiple workers, ensure all workers share the same CAS storage path.", |
272 | 1 | key.as_str() |
273 | | ); |
274 | 1 | if let StoreKey::Digest(d) = key.borrow() { |
275 | 1 | err.with_context(ErrorContext::MissingDigest { |
276 | 1 | hash: d.packed_hash().to_string(), |
277 | 1 | size: d.size_bytes() as i64, |
278 | 1 | }) |
279 | | } else { |
280 | 0 | err |
281 | | } |
282 | 1 | })? |
283 | | ) |
284 | | }; |
285 | | |
286 | 51 | let send_range = offset..length.map_or(u64::MAX, |length| length28 + offset28 ); |
287 | 51 | let mut bytes_received: u64 = 0; |
288 | 51 | let mut counted_hit = false; |
289 | | |
290 | 51 | let (mut fast_tx, fast_rx) = make_buf_channel_pair(); |
291 | 51 | let (slow_tx, mut slow_rx) = make_buf_channel_pair(); |
292 | 51 | let data_stream_fut = async move { |
293 | 51 | let mut maybe_writer_pin = maybe_writer.map(Pin::new); |
294 | | loop { |
295 | 94 | let output_buf93 = slow_rx |
296 | 94 | .recv() |
297 | 94 | .await |
298 | 94 | .err_tip(|| "Failed to read data data buffer from slow store")?1 ; |
299 | 93 | if output_buf.is_empty() { |
300 | | // Write out our EOF. |
301 | | // We are dropped as soon as we send_eof to writer_pin, so |
302 | | // we wait until we've finished all of our joins to do that. |
303 | 50 | let fast_res = fast_tx.send_eof(); |
304 | 50 | return Ok::<_, Error>((fast_res, maybe_writer_pin)); |
305 | 43 | } |
306 | | |
307 | 43 | if !counted_hit { |
308 | 43 | self.metrics |
309 | 43 | .slow_store_hit_count |
310 | 43 | .fetch_add(1, Ordering::Acquire); |
311 | 43 | counted_hit = true; |
312 | 43 | }0 |
313 | | |
314 | 43 | let output_buf_len = u64::try_from(output_buf.len()) |
315 | 43 | .err_tip(|| "Could not output_buf.len() to u64")?0 ; |
316 | 43 | self.metrics |
317 | 43 | .slow_store_downloaded_bytes |
318 | 43 | .fetch_add(output_buf_len, Ordering::Acquire); |
319 | | |
320 | 43 | let writer_fut = Self::calculate_range( |
321 | 43 | &(bytes_received..bytes_received + output_buf_len), |
322 | 43 | &send_range, |
323 | 0 | )? |
324 | 43 | .zip(maybe_writer_pin.as_mut()) |
325 | 43 | .map_or_else( |
326 | 15 | || futures::future::ready(Ok(())).left_future(), |
327 | 28 | |(range, writer_pin)| writer_pin.send(output_buf.slice(range)).right_future(), |
328 | | ); |
329 | | |
330 | 43 | bytes_received += output_buf_len; |
331 | | |
332 | 43 | let (fast_tx_res, writer_res) = join!(fast_tx.send(output_buf), writer_fut); |
333 | 43 | fast_tx_res.err_tip(|| "Failed to write to fast store in fast_slow store")?0 ; |
334 | 43 | writer_res.err_tip(|| "Failed to write result to writer in fast_slow store")?0 ; |
335 | | } |
336 | 51 | }; |
337 | | |
338 | 51 | let slow_store_fut = self.slow_store.get(key.borrow(), slow_tx); |
339 | 51 | let fast_store_fut = self |
340 | 51 | .fast_store |
341 | 51 | .update(key.borrow(), fast_rx, reader_stream_size); |
342 | | |
343 | 51 | let (data_stream_res, slow_res, fast_res) = |
344 | 51 | join!(data_stream_fut, slow_store_fut, fast_store_fut); |
345 | 51 | match data_stream_res { |
346 | 50 | Ok((fast_eof_res, maybe_writer_pin)) => |
347 | | // Sending the EOF will drop us almost immediately in bytestream_server |
348 | | // so we perform it as the very last action in this method. |
349 | | { |
350 | 50 | fast_eof_res.merge(fast_res).merge(slow_res).merge( |
351 | 50 | if let Some(mut writer_pin34 ) = maybe_writer_pin { |
352 | 34 | writer_pin.send_eof() |
353 | | } else { |
354 | 16 | Ok(()) |
355 | | }, |
356 | | ) |
357 | | } |
358 | 1 | Err(err) => match slow_res { |
359 | 1 | Err(slow_err) if slow_err.code == Code::NotFound => Err(slow_err), |
360 | 0 | _ => fast_res.merge(slow_res).merge(Err(err)), |
361 | | }, |
362 | | } |
363 | 52 | } |
364 | | |
365 | | /// Ensure our fast store is populated. This should be kept as a low |
366 | | /// cost function. Since the data itself is shared and not copied it should be fairly |
367 | | /// low cost to just discard the data, but does cost a few mutex locks while |
368 | | /// streaming. |
369 | 26 | pub async fn populate_fast_store(&self, key: StoreKey<'_>) -> Result<(), Error>0 { |
370 | 26 | let maybe_size_info = self |
371 | 26 | .fast_store |
372 | 26 | .has(key.borrow()) |
373 | 26 | .await |
374 | 26 | .err_tip(|| "While querying in populate_fast_store")?0 ; |
375 | 26 | if maybe_size_info.is_some() { |
376 | 10 | return Ok(()); |
377 | 16 | } |
378 | | |
379 | | // If the fast store is noop or read only or update only then this is an error. |
380 | 16 | if self |
381 | 16 | .fast_store |
382 | 16 | .inner_store(Some(key.borrow())) |
383 | 16 | .optimized_for(StoreOptimizations::NoopUpdates) |
384 | 16 | || self.fast_direction == StoreDirection::ReadOnly |
385 | 16 | || self.fast_direction == StoreDirection::Update |
386 | | { |
387 | 0 | return Err(make_err!( |
388 | 0 | Code::Internal, |
389 | 0 | "Attempt to populate fast store that is read only or noop" |
390 | 0 | )); |
391 | 16 | } |
392 | | |
393 | 16 | self.get_loader(key.borrow()) |
394 | 16 | .get_or_try_init(|| { |
395 | 16 | Pin::new(self).populate_and_maybe_stream(key.borrow(), None, 0, None) |
396 | 16 | }) |
397 | 16 | .await |
398 | 16 | .err_tip(|| "Failed to populate()") |
399 | 26 | } |
400 | | |
401 | | /// Returns the range of bytes that should be sent given a slice bounds |
402 | | /// offset so the output range maps the `received_range.start` to 0. |
403 | | // TODO(palfrey) This should be put into utils, as this logic is used |
404 | | // elsewhere in the code. |
405 | 53 | pub fn calculate_range( |
406 | 53 | received_range: &Range<u64>, |
407 | 53 | send_range: &Range<u64>, |
408 | 53 | ) -> Result<Option<Range<usize>>, Error> { |
409 | | // Protect against subtraction overflow. |
410 | 53 | if received_range.start >= received_range.end { |
411 | 0 | return Ok(None); |
412 | 53 | } |
413 | | |
414 | 53 | let start = max(received_range.start, send_range.start); |
415 | 53 | let end = min(received_range.end, send_range.end); |
416 | 53 | if received_range.contains(&start) && received_range51 .contains51 (&(end - 1)51 ) { |
417 | | // Offset both to the start of the received_range. |
418 | 49 | let calculated_range_start = usize::try_from(start - received_range.start) |
419 | 49 | .err_tip(|| "Could not convert (start - received_range.start) to usize")?0 ; |
420 | 49 | let calculated_range_end = usize::try_from(end - received_range.start) |
421 | 49 | .err_tip(|| "Could not convert (end - received_range.start) to usize")?0 ; |
422 | 49 | Ok(Some(calculated_range_start..calculated_range_end)) |
423 | | } else { |
424 | 4 | Ok(None) |
425 | | } |
426 | 53 | } |
427 | | } |
428 | | |
429 | | #[async_trait] |
430 | | impl StoreDriver for FastSlowStore { |
431 | 0 | async fn post_init(self: Arc<Self>) -> Result<(), Error> { |
432 | | try_join!( |
433 | | self.fast_store.clone().into_inner().post_init(), |
434 | | self.slow_store.clone().into_inner().post_init() |
435 | | )?; |
436 | | Ok(()) |
437 | 0 | } |
438 | | |
439 | | async fn has_with_results( |
440 | | self: Pin<&Self>, |
441 | | key: &[StoreKey<'_>], |
442 | | results: &mut [Option<u64>], |
443 | 24 | ) -> Result<(), Error> { |
444 | | // If our slow store is a noop store, it'll always return a 404, |
445 | | // so only check the fast store in such case. |
446 | | let slow_store = self.slow_store.inner_store::<StoreKey<'_>>(None); |
447 | | if slow_store.optimized_for(StoreOptimizations::NoopDownloads) { |
448 | | return self.fast_store.has_with_results(key, results).await; |
449 | | } |
450 | | |
451 | | // Check with the slow store first. |
452 | | self.slow_store.has_with_results(key, results).await?; |
453 | | |
454 | | // Check for any in-flight requests to the slow store next. |
455 | | let mut in_flight_futs = FuturesUnordered::new(); |
456 | | { |
457 | | let in_flight = self.in_flight_slow_writes.lock(); |
458 | | if !in_flight.is_empty() { |
459 | | for (i, (k, result)) in key.iter().zip(results.iter_mut()).enumerate() { |
460 | | if result.is_none() { |
461 | | let owned = k.borrow().into_owned(); |
462 | | if let Some(maybe_size) = in_flight.get(&owned) { |
463 | | let maybe_size = maybe_size.clone(); |
464 | 3 | in_flight_futs.push(async move { (i, *maybe_size.lock().await) }); |
465 | | } |
466 | | } |
467 | | } |
468 | | } |
469 | | } |
470 | | while let Some((i, size)) = in_flight_futs.next().await { |
471 | | results[i] = size; |
472 | | } |
473 | | |
474 | | // NOTE: We intentionally *NEVER* check the fast store, this is to |
475 | | // ensure that we re-upload data to the slow store if it only exists |
476 | | // in the fast store. This does not affect workers as they do not |
477 | | // check existence through `has` and instead go direct to loading the |
478 | | // data which bypasses the check and will load from the fast store if |
479 | | // it does not exist in the slow store. |
480 | | |
481 | | Ok(()) |
482 | 24 | } |
483 | | |
484 | | async fn update( |
485 | | self: Pin<&Self>, |
486 | | key: StoreKey<'_>, |
487 | | mut reader: DropCloserReadHalf, |
488 | | size_info: UploadSizeInfo, |
489 | 123 | ) -> Result<u64, Error> { |
490 | | // If either one of our stores is a noop store, bypass the multiplexing |
491 | | // and just use the store that is not a noop store. |
492 | | let ignore_slow = self |
493 | | .slow_store |
494 | | .inner_store(Some(key.borrow())) |
495 | | .optimized_for(StoreOptimizations::NoopUpdates) |
496 | | || self.slow_direction == StoreDirection::ReadOnly |
497 | | || self.slow_direction == StoreDirection::Get; |
498 | | let ignore_fast = self |
499 | | .fast_store |
500 | | .inner_store(Some(key.borrow())) |
501 | | .optimized_for(StoreOptimizations::NoopUpdates) |
502 | | || self.fast_direction == StoreDirection::ReadOnly |
503 | | || self.fast_direction == StoreDirection::Get; |
504 | | if ignore_slow && ignore_fast { |
505 | | // We need to drain the reader to avoid the writer complaining that we dropped |
506 | | // the connection prematurely. |
507 | | return reader.drain().await.err_tip(|| "In FastFlowStore::update"); |
508 | | } |
509 | | if ignore_slow { |
510 | | return self.fast_store.update(key, reader, size_info).await; |
511 | | } |
512 | | let slow_in_flight_guard = self.register_in_flight_slow_write(key.borrow()); |
513 | | if ignore_fast { |
514 | | let result = self |
515 | | .slow_store |
516 | | .update(key.borrow(), reader, size_info) |
517 | | .await; |
518 | | if let Ok(size) = &result { |
519 | | slow_in_flight_guard.complete(Some(*size)); |
520 | | } |
521 | | return result; |
522 | | } |
523 | | |
524 | | let (mut fast_tx, fast_rx) = make_buf_channel_pair(); |
525 | | let (mut slow_tx, slow_rx) = make_buf_channel_pair(); |
526 | | |
527 | | let key_debug = format!("{key:?}"); |
528 | | trace!( |
529 | | key = %key_debug, |
530 | | "FastSlowStore::update: starting dual-store upload", |
531 | | ); |
532 | | let update_start = std::time::Instant::now(); |
533 | | |
534 | 117 | let data_stream_fut = async move { |
535 | 117 | let mut bytes_sent: u64 = 0; |
536 | | loop { |
537 | 189 | let buffer = reader |
538 | 189 | .recv() |
539 | 189 | .await |
540 | 189 | .err_tip(|| "Failed to read buffer in fastslow store")?0 ; |
541 | 189 | if buffer.is_empty() { |
542 | | // EOF received. |
543 | 117 | fast_tx.send_eof().err_tip( |
544 | | || "Failed to write eof to fast store in fast_slow store update", |
545 | 0 | )?; |
546 | 117 | slow_tx |
547 | 117 | .send_eof() |
548 | 117 | .err_tip(|| "Failed to write eof to writer in fast_slow store update")?0 ; |
549 | 117 | debug!( |
550 | | total_bytes = bytes_sent, |
551 | | "FastSlowStore::update: data_stream sent EOF to both stores", |
552 | | ); |
553 | 117 | return Result::<u64, Error>::Ok(bytes_sent); |
554 | 72 | } |
555 | | |
556 | 72 | let chunk_len = buffer.len(); |
557 | 72 | let send_start = std::time::Instant::now(); |
558 | 72 | let (fast_result, slow_result) = |
559 | 72 | join!(fast_tx.send(buffer.clone()), slow_tx.send(buffer)); |
560 | 72 | let send_elapsed = send_start.elapsed(); |
561 | 72 | if send_elapsed.as_secs() >= 5 { |
562 | 0 | warn!( |
563 | | chunk_len, |
564 | 0 | send_elapsed_ms = send_elapsed.as_millis(), |
565 | | total_bytes = bytes_sent, |
566 | | "FastSlowStore::update: channel send stalled (>5s). A downstream store may be hanging", |
567 | | ); |
568 | 72 | } |
569 | 72 | bytes_sent += u64::try_from(chunk_len).unwrap_or(u64::MAX); |
570 | 72 | fast_result |
571 | 72 | .map_err(|e| {0 |
572 | 0 | make_err!( |
573 | 0 | Code::Internal, |
574 | | "Failed to send message to fast_store in fast_slow_store {:?}", |
575 | | e |
576 | | ) |
577 | 0 | }) |
578 | 72 | .merge(slow_result.map_err(|e| {0 |
579 | 0 | make_err!( |
580 | 0 | Code::Internal, |
581 | | "Failed to send message to slow_store in fast_slow store {:?}", |
582 | | e |
583 | | ) |
584 | 0 | }))?; |
585 | | } |
586 | 117 | }; |
587 | | |
588 | | let fast_store_fut = self.fast_store.update(key.borrow(), fast_rx, size_info); |
589 | | let slow_store_fut = self.slow_store.update(key.borrow(), slow_rx, size_info); |
590 | | |
591 | | let (data_stream_res, fast_res, slow_res) = |
592 | | join!(data_stream_fut, fast_store_fut, slow_store_fut); |
593 | | |
594 | | if let Ok(size) = slow_res { |
595 | | slow_in_flight_guard.complete(Some(size)); |
596 | | } else { |
597 | | drop(slow_in_flight_guard); |
598 | | } |
599 | | |
600 | | let total_elapsed = update_start.elapsed(); |
601 | | if data_stream_res.is_err() || fast_res.is_err() || slow_res.is_err() { |
602 | | let all_not_found = [&data_stream_res, &fast_res, &slow_res] |
603 | | .iter() |
604 | 0 | .all(|r| match r { |
605 | 0 | Ok(_size) => true, |
606 | 0 | Err(e) => e.code == Code::NotFound, |
607 | 0 | }); |
608 | | if all_not_found { |
609 | | info!( |
610 | | key = %key_debug, |
611 | | elapsed_ms = total_elapsed.as_millis(), |
612 | | data_stream_ok = data_stream_res.is_ok(), |
613 | | fast_store_ok = fast_res.is_ok(), |
614 | | slow_store_ok = slow_res.is_ok(), |
615 | | "FastSlowStore::update: completed with NotFound error(s)", |
616 | | ); |
617 | | } else { |
618 | | warn!( |
619 | | key = %key_debug, |
620 | | elapsed_ms = total_elapsed.as_millis(), |
621 | | data_stream_ok = data_stream_res.is_ok(), |
622 | | fast_store_ok = fast_res.is_ok(), |
623 | | slow_store_ok = slow_res.is_ok(), |
624 | | "FastSlowStore::update: completed with error(s)", |
625 | | ); |
626 | | } |
627 | | } else { |
628 | | trace!( |
629 | | key = %key_debug, |
630 | | elapsed_ms = total_elapsed.as_millis(), |
631 | | "FastSlowStore::update: completed successfully", |
632 | | ); |
633 | | } |
634 | | data_stream_res.merge(fast_res).merge(slow_res) |
635 | 123 | } |
636 | | |
637 | | /// `FastSlowStore` has optimizations for dealing with files. |
638 | 0 | fn optimized_for(&self, optimization: StoreOptimizations) -> bool { |
639 | 0 | optimization == StoreOptimizations::FileUpdates |
640 | 0 | } |
641 | | |
642 | | /// Optimized variation to consume the file if one of the stores is a |
643 | | /// filesystem store. This makes the operation a move instead of a copy |
644 | | /// dramatically increasing performance for large files. |
645 | | async fn update_with_whole_file( |
646 | | self: Pin<&Self>, |
647 | | key: StoreKey<'_>, |
648 | | path: OsString, |
649 | | mut file: fs::FileSlot, |
650 | | upload_size: UploadSizeInfo, |
651 | 10 | ) -> Result<(u64, Option<fs::FileSlot>), Error> { |
652 | | trace!( |
653 | | key = ?key, |
654 | | ?upload_size, |
655 | | "FastSlowStore::update_with_whole_file: starting", |
656 | | ); |
657 | | if self |
658 | | .fast_store |
659 | | .optimized_for(StoreOptimizations::FileUpdates) |
660 | | { |
661 | | if !self |
662 | | .slow_store |
663 | | .inner_store(Some(key.borrow())) |
664 | | .optimized_for(StoreOptimizations::NoopUpdates) |
665 | | && self.slow_direction != StoreDirection::ReadOnly |
666 | | && self.slow_direction != StoreDirection::Get |
667 | | { |
668 | | trace!("FastSlowStore::update_with_whole_file: uploading to slow_store"); |
669 | | let slow_start = std::time::Instant::now(); |
670 | | let slow_in_flight_guard = self.register_in_flight_slow_write(key.borrow()); |
671 | | let size = slow_update_store_with_file( |
672 | | self.slow_store.as_store_driver_pin(), |
673 | | key.borrow(), |
674 | | &mut file, |
675 | | upload_size, |
676 | | ) |
677 | | .await |
678 | | .err_tip(|| "In FastSlowStore::update_with_whole_file slow_store")?; |
679 | | slow_in_flight_guard.complete(Some(size)); |
680 | | trace!( |
681 | | elapsed_ms = slow_start.elapsed().as_millis(), |
682 | | "FastSlowStore::update_with_whole_file: slow_store upload completed", |
683 | | ); |
684 | | } |
685 | | if self.fast_direction == StoreDirection::ReadOnly |
686 | | || self.fast_direction == StoreDirection::Get |
687 | | { |
688 | | let file_size = match upload_size { |
689 | | UploadSizeInfo::ExactSize(size) => size, |
690 | | UploadSizeInfo::MaxSize(_) => file |
691 | | .as_ref() |
692 | | .metadata() |
693 | | .await |
694 | 0 | .err_tip(|| format!("While reading metadata for {}", path.display()))? |
695 | | .len(), |
696 | | }; |
697 | | return Ok((file_size, Some(file))); |
698 | | } |
699 | | return self |
700 | | .fast_store |
701 | | .update_with_whole_file(key, path, file, upload_size) |
702 | | .await; |
703 | | } |
704 | | |
705 | | if self |
706 | | .slow_store |
707 | | .optimized_for(StoreOptimizations::FileUpdates) |
708 | | { |
709 | | let ignore_fast = self |
710 | | .fast_store |
711 | | .inner_store(Some(key.borrow())) |
712 | | .optimized_for(StoreOptimizations::NoopUpdates) |
713 | | || self.fast_direction == StoreDirection::ReadOnly |
714 | | || self.fast_direction == StoreDirection::Get; |
715 | | let maybe_size = if ignore_fast { |
716 | | None |
717 | | } else { |
718 | | Some( |
719 | | slow_update_store_with_file( |
720 | | self.fast_store.as_store_driver_pin(), |
721 | | key.borrow(), |
722 | | &mut file, |
723 | | upload_size, |
724 | | ) |
725 | | .await |
726 | | .err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?, |
727 | | ) |
728 | | }; |
729 | | let ignore_slow = self.slow_direction == StoreDirection::ReadOnly |
730 | | || self.slow_direction == StoreDirection::Get; |
731 | | if ignore_slow { |
732 | | let size = match maybe_size { |
733 | | Some(size) => size, |
734 | | None => match upload_size { |
735 | | UploadSizeInfo::ExactSize(size) => size, |
736 | | UploadSizeInfo::MaxSize(_) => file |
737 | | .as_ref() |
738 | | .metadata() |
739 | | .await |
740 | 0 | .err_tip(|| format!("While reading metadata for {}", path.display()))? |
741 | | .len(), |
742 | | }, |
743 | | }; |
744 | | return Ok((size, Some(file))); |
745 | | } |
746 | | let slow_in_flight_guard: InFlightSlowWriteGuard = |
747 | | self.register_in_flight_slow_write(key.borrow()); |
748 | | let (size, maybe_file_slot) = self |
749 | | .slow_store |
750 | | .update_with_whole_file(key.borrow(), path, file, upload_size) |
751 | | .await?; |
752 | | slow_in_flight_guard.complete(Some(size)); |
753 | | return Ok((size, maybe_file_slot)); |
754 | | } |
755 | | |
756 | | let size = slow_update_store_with_file(self, key, &mut file, upload_size) |
757 | | .await |
758 | | .err_tip(|| "In FastSlowStore::update_with_whole_file")?; |
759 | | Ok((size, Some(file))) |
760 | 10 | } |
761 | | |
762 | | async fn get_part( |
763 | | self: Pin<&Self>, |
764 | | key: StoreKey<'_>, |
765 | | writer: &mut DropCloserWriteHalf, |
766 | | offset: u64, |
767 | | length: Option<u64>, |
768 | 183 | ) -> Result<(), Error> { |
769 | | // `has()` can report a stale map entry whose file is gone, so |
770 | | // get_part may still return NotFound; fall through to the slow |
771 | | // store unless we have already streamed bytes to the caller. |
772 | | if self.fast_store.has(key.borrow()).await?.is_some() { |
773 | | let bytes_before = writer.get_bytes_written(); |
774 | | match self |
775 | | .fast_store |
776 | | .get_part(key.borrow(), writer.borrow_mut(), offset, length) |
777 | | .await |
778 | | { |
779 | | Ok(()) => { |
780 | | self.metrics |
781 | | .fast_store_hit_count |
782 | | .fetch_add(1, Ordering::Acquire); |
783 | | self.metrics |
784 | | .fast_store_downloaded_bytes |
785 | | .fetch_add(writer.get_bytes_written(), Ordering::Acquire); |
786 | | return Ok(()); |
787 | | } |
788 | | Err(e) |
789 | | if e.code == Code::NotFound && writer.get_bytes_written() == bytes_before => |
790 | | { |
791 | | self.metrics |
792 | | .fast_store_stale_map_falls_through |
793 | | .fetch_add(1, Ordering::Acquire); |
794 | | warn!(%key, ?e, "Stale fast-store map entry; falling through to slow store"); |
795 | | // fall through to populate path |
796 | | } |
797 | | Err(e) => return Err(e), |
798 | | } |
799 | | } |
800 | | |
801 | | // If the fast store is noop or read only or update only then bypass it. |
802 | | if self |
803 | | .fast_store |
804 | | .inner_store(Some(key.borrow())) |
805 | | .optimized_for(StoreOptimizations::NoopUpdates) |
806 | | || self.fast_direction == StoreDirection::ReadOnly |
807 | | || self.fast_direction == StoreDirection::Update |
808 | | { |
809 | | self.metrics |
810 | | .slow_store_hit_count |
811 | | .fetch_add(1, Ordering::Acquire); |
812 | | self.slow_store |
813 | | .get_part(key, writer.borrow_mut(), offset, length) |
814 | | .await?; |
815 | | self.metrics |
816 | | .slow_store_downloaded_bytes |
817 | | .fetch_add(writer.get_bytes_written(), Ordering::Acquire); |
818 | | return Ok(()); |
819 | | } |
820 | | |
821 | | // Huge blobs: dedup is a net loss (followers time out anyway and |
822 | | // the fast tier is evicted), so read straight from the slow store. |
823 | | if self.should_bypass_dedup(&key) { |
824 | | self.metrics |
825 | | .huge_blob_dedup_bypasses |
826 | | .fetch_add(1, Ordering::Acquire); |
827 | | self.metrics |
828 | | .slow_store_hit_count |
829 | | .fetch_add(1, Ordering::Acquire); |
830 | | debug!(%key, threshold_bytes = self.bypass_dedup_threshold_bytes, "Bypassing dedup for huge blob"); |
831 | | self.slow_store |
832 | | .get_part(key, writer.borrow_mut(), offset, length) |
833 | | .await |
834 | | .err_tip(|| "In FastSlowStore::get_part huge-blob bypass")?; |
835 | | self.metrics |
836 | | .slow_store_downloaded_bytes |
837 | | .fetch_add(writer.get_bytes_written(), Ordering::Acquire); |
838 | | return Ok(()); |
839 | | } |
840 | | |
841 | | let mut writer = Some(writer); |
842 | | |
843 | | // Drive the dedup loader. Two distinct paths: |
844 | | // |
845 | | // * Leader (created the OnceCell entry): runs `populate` with |
846 | | // OUR `writer`, streaming directly to the caller while |
847 | | // filling the fast cache. No timeout: a multi-GB blob |
848 | | // legitimately takes minutes to stream, and cancelling our |
849 | | // own populate would propagate the failure to every other |
850 | | // reader of this digest. |
851 | | // |
852 | | // * Follower: bound the wait so a wedged leader does not pin |
853 | | // us until the upstream `gRPC` deadline fires. The follower |
854 | | // closure passes `None` for `writer`. This is critical: if |
855 | | // the OnceCell ever promotes our follower closure to leader |
856 | | // (because the original leader's future was dropped), and |
857 | | // our `tokio::time::timeout` then cancels it, *no* caller |
858 | | // `writer` was ever moved into the populate stream, so no |
859 | | // `gRPC` sender is dropped without EOF. The follower then |
860 | | // re-enters `get_part` below and reads from the now-warm |
861 | | // fast cache, OR falls back to the slow store on timeout. |
862 | | let needs_slow_store_fallback: bool = { |
863 | | let loader_guard = self.get_loader(key.borrow()); |
864 | | let is_leader = loader_guard.is_leader; |
865 | | if is_leader { |
866 | | loader_guard |
867 | 36 | .get_or_try_init(|| { |
868 | 36 | self.populate_and_maybe_stream(key.borrow(), writer.take(), offset, length) |
869 | 36 | }) |
870 | | .await?; |
871 | | false |
872 | | } else { |
873 | 0 | let load_fut = loader_guard.get_or_try_init(|| { |
874 | 0 | self.populate_and_maybe_stream(key.borrow(), None, offset, length) |
875 | 0 | }); |
876 | | match tokio::time::timeout(LEADER_WAIT_TIMEOUT, load_fut).await { |
877 | | Ok(result) => { |
878 | | result?; |
879 | | false |
880 | | } |
881 | | Err(_elapsed) => { |
882 | | self.metrics |
883 | | .leader_wait_timeouts |
884 | | .fetch_add(1, Ordering::Acquire); |
885 | | warn!( |
886 | | %key, |
887 | | timeout_secs = LEADER_WAIT_TIMEOUT.as_secs(), |
888 | | "FastSlowStore::get_part: leader-wait exceeded timeout, bypassing dedup and reading slow store directly", |
889 | | ); |
890 | | true |
891 | | } |
892 | | } |
893 | | } |
894 | | }; |
895 | | |
896 | | if needs_slow_store_fallback && let Some(writer) = writer.take() { |
897 | | return self |
898 | | .slow_store |
899 | | .get_part(key, writer, offset, length) |
900 | | .await |
901 | | .err_tip( |
902 | | || "In FastSlowStore::get_part slow_store fallback after leader-wait timeout", |
903 | | ); |
904 | | } |
905 | | |
906 | | // If we didn't stream then re-enter which will stream from the fast |
907 | | // store, or retry the download. We should not get in a loop here |
908 | | // because OnceCell has the good sense to retry for all callers so in |
909 | | // order to get here the fast store will have been populated. There's |
910 | | // an outside chance it was evicted, but that's slim. |
911 | | if let Some(writer) = writer.take() { |
912 | | self.get_part(key, writer, offset, length).await |
913 | | } else { |
914 | | // This was the thread that did the streaming already, lucky duck. |
915 | | Ok(()) |
916 | | } |
917 | 183 | } |
918 | | |
919 | 2 | fn inner_store(&self, _key: Option<StoreKey>) -> &dyn StoreDriver { |
920 | 2 | self |
921 | 2 | } |
922 | | |
923 | 2 | fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) { |
924 | 2 | self |
925 | 2 | } |
926 | | |
927 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> { |
928 | 0 | self |
929 | 0 | } |
930 | | |
931 | 0 | fn register_remove_callback( |
932 | 0 | self: Arc<Self>, |
933 | 0 | callback: Arc<dyn RemoveItemCallback>, |
934 | 0 | ) -> Result<(), Error> { |
935 | 0 | self.fast_store.register_remove_callback(callback.clone())?; |
936 | 0 | self.slow_store.register_remove_callback(callback)?; |
937 | 0 | Ok(()) |
938 | 0 | } |
939 | | } |
940 | | |
941 | | #[derive(Debug, Default, MetricsComponent)] |
942 | | struct FastSlowStoreMetrics { |
943 | | #[metric(help = "Hit count for the fast store")] |
944 | | fast_store_hit_count: AtomicU64, |
945 | | #[metric(help = "Downloaded bytes from the fast store")] |
946 | | fast_store_downloaded_bytes: AtomicU64, |
947 | | #[metric(help = "Hit count for the slow store")] |
948 | | slow_store_hit_count: AtomicU64, |
949 | | #[metric(help = "Downloaded bytes from the slow store")] |
950 | | slow_store_downloaded_bytes: AtomicU64, |
951 | | #[metric( |
952 | | help = "Number of times a follower bypassed the populating-digests dedup because the leader exceeded LEADER_WAIT_TIMEOUT" |
953 | | )] |
954 | | leader_wait_timeouts: AtomicU64, |
955 | | #[metric(help = "get_part calls that bypassed dedup for huge blobs")] |
956 | | huge_blob_dedup_bypasses: AtomicU64, |
957 | | #[metric(help = "Stale fast-store map entries that fell through to the slow store")] |
958 | | fast_store_stale_map_falls_through: AtomicU64, |
959 | | } |
960 | | |
961 | | /// Maximum time a follower will wait on the leader-populator before |
962 | | /// bypassing the dedup map and reading directly from the slow store. |
963 | | /// |
964 | | /// Without this bound a single wedged populator would block every |
965 | | /// concurrent reader of the same digest until each one's own `gRPC` |
966 | | /// deadline fired (e.g. Bazel's `--remote_timeout`), turning a |
967 | | /// single slow read into a fan-out of `DEADLINE_EXCEEDED` errors. |
968 | | const LEADER_WAIT_TIMEOUT: Duration = Duration::from_mins(1); |
969 | | |
970 | | default_health_status_indicator!(FastSlowStore); |