/build/source/nativelink-store/src/fast_slow_store.rs
Line | Count | Source |
1 | | // Copyright 2024-2025 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::borrow::BorrowMut; |
16 | | use core::cmp::{max, min}; |
17 | | use core::ops::Range; |
18 | | use core::pin::Pin; |
19 | | use core::sync::atomic::{AtomicU64, Ordering}; |
20 | | use core::time::Duration; |
21 | | use std::collections::HashMap; |
22 | | use std::ffi::OsString; |
23 | | use std::sync::{Arc, Weak}; |
24 | | |
25 | | use async_trait::async_trait; |
26 | | use futures::{FutureExt, join}; |
27 | | use nativelink_config::stores::{FastSlowSpec, StoreDirection}; |
28 | | use nativelink_error::{Code, Error, ErrorContext, ResultExt, make_err}; |
29 | | use nativelink_metric::MetricsComponent; |
30 | | use nativelink_util::buf_channel::{ |
31 | | DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair, |
32 | | }; |
33 | | use nativelink_util::fs; |
34 | | use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator}; |
35 | | use nativelink_util::store_trait::{ |
36 | | RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, StoreOptimizations, |
37 | | UploadSizeInfo, slow_update_store_with_file, |
38 | | }; |
39 | | use parking_lot::Mutex; |
40 | | use tokio::sync::OnceCell; |
41 | | use tracing::{debug, info, trace, warn}; |
42 | | |
43 | | // TODO(palfrey) This store needs to be evaluated for more efficient memory usage, |
44 | | // there are many copies happening internally. |
45 | | |
46 | | type Loader = Arc<OnceCell<()>>; |
47 | | |
48 | | // TODO(palfrey) We should consider copying the data in the background to allow the |
49 | | // client to hang up while the data is buffered. An alternative is to possibly make a |
50 | | // "BufferedStore" that could be placed on the "slow" store that would hang up early |
51 | | // if data is in the buffer. |
52 | | #[derive(Debug, MetricsComponent)] |
53 | | pub struct FastSlowStore { |
54 | | #[metric(group = "fast_store")] |
55 | | fast_store: Store, |
56 | | fast_direction: StoreDirection, |
57 | | #[metric(group = "slow_store")] |
58 | | slow_store: Store, |
59 | | slow_direction: StoreDirection, |
60 | | weak_self: Weak<Self>, |
61 | | #[metric] |
62 | | metrics: FastSlowStoreMetrics, |
63 | | // De-duplicate requests for the fast store, only the first streams, others |
64 | | // are blocked. This may feel like it's causing a slow down of tasks, but |
65 | | // actually it's faster because we're not downloading the file multiple |
66 | | // times are doing loads of duplicate IO. |
67 | | populating_digests: Mutex<HashMap<StoreKey<'static>, Loader>>, |
68 | | // Tracks keys whose slow-store write is currently in flight, along with |
69 | | // the best-known size. Consulted by `has_with_results` so that a |
70 | | // concurrent writer that has not yet finished pushing to the slow store |
71 | | // is still visible to a concurrent existence check, preventing redundant |
72 | | // duplicate uploads of the same blob. |
73 | | in_flight_slow_writes: Mutex<HashMap<StoreKey<'static>, u64>>, |
74 | | } |
75 | | |
76 | | // This guard ensures that the populating_digests is cleared even if the future |
77 | | // is dropped, it is cancel safe. |
78 | | struct LoaderGuard<'a> { |
79 | | weak_store: Weak<FastSlowStore>, |
80 | | key: StoreKey<'a>, |
81 | | loader: Option<Loader>, |
82 | | is_leader: bool, |
83 | | } |
84 | | |
85 | | impl LoaderGuard<'_> { |
86 | 35 | async fn get_or_try_init<E, F, Fut>(&self, f: F) -> Result<(), E> |
87 | 35 | where |
88 | 35 | F: FnOnce() -> Fut, |
89 | 35 | Fut: Future<Output = Result<(), E>>, |
90 | 35 | { |
91 | 35 | if let Some(loader) = &self.loader { |
92 | 35 | loader.get_or_try_init(f).await.map34 (|&()| ()) |
93 | | } else { |
94 | | // This is impossible, but we do it anyway. |
95 | 0 | f().await |
96 | | } |
97 | 34 | } |
98 | | } |
99 | | |
100 | | /// Cancel-safe RAII guard that removes an entry from |
101 | | /// `FastSlowStore::in_flight_slow_writes` when dropped. This ensures the |
102 | | /// map does not leak entries if the surrounding `update()` future is |
103 | | /// cancelled before the slow-store write completes. |
104 | | struct InFlightSlowWriteGuard { |
105 | | weak_store: Weak<FastSlowStore>, |
106 | | key: Option<StoreKey<'static>>, |
107 | | } |
108 | | |
109 | | impl Drop for InFlightSlowWriteGuard { |
110 | 131 | fn drop(&mut self) { |
111 | 131 | let Some(store) = self.weak_store.upgrade() else { |
112 | 0 | return; |
113 | | }; |
114 | 131 | let Some(key) = self.key.take() else { |
115 | 0 | return; |
116 | | }; |
117 | 131 | store.in_flight_slow_writes.lock().remove(&key); |
118 | 131 | } |
119 | | } |
120 | | |
121 | | impl Drop for LoaderGuard<'_> { |
122 | 35 | fn drop(&mut self) { |
123 | 35 | let Some(store) = self.weak_store.upgrade() else { |
124 | | // The store has already gone away, nothing to remove from. |
125 | 0 | return; |
126 | | }; |
127 | 35 | let Some(loader) = self.loader.take() else { |
128 | | // This should never happen, but we do it to be safe. |
129 | 0 | return; |
130 | | }; |
131 | | |
132 | 35 | let mut guard = store.populating_digests.lock(); |
133 | 35 | if let std::collections::hash_map::Entry::Occupied(occupied_entry) = |
134 | 35 | guard.entry(self.key.borrow().into_owned()) |
135 | 35 | && Arc::ptr_eq(occupied_entry.get(), &loader) |
136 | | { |
137 | 35 | drop(loader); |
138 | 35 | if Arc::strong_count(occupied_entry.get()) == 1 { |
139 | 19 | // This is the last loader, so remove it. |
140 | 19 | occupied_entry.remove(); |
141 | 19 | }16 |
142 | 0 | } |
143 | 35 | } |
144 | | } |
145 | | |
146 | | impl FastSlowStore { |
147 | 54 | pub fn new(spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc<Self> { |
148 | 54 | Arc::new_cyclic(|weak_self| Self { |
149 | 54 | fast_store, |
150 | 54 | fast_direction: spec.fast_direction, |
151 | 54 | slow_store, |
152 | 54 | slow_direction: spec.slow_direction, |
153 | 54 | weak_self: weak_self.clone(), |
154 | 54 | metrics: FastSlowStoreMetrics::default(), |
155 | 54 | populating_digests: Mutex::new(HashMap::new()), |
156 | 54 | in_flight_slow_writes: Mutex::new(HashMap::new()), |
157 | 54 | }) |
158 | 54 | } |
159 | | |
160 | | /// Best-effort size for tracking in-flight slow writes. Falls back to 0 |
161 | | /// when neither the upload size nor the key carries an exact size |
162 | | /// (e.g. `MaxSize` for a string key). Callers of `has_with_results` only |
163 | | /// rely on `Some(_)` vs `None`, so a size of 0 still correctly signals |
164 | | /// "this blob exists". |
165 | 131 | const fn track_size(key: &StoreKey<'_>, size_info: UploadSizeInfo) -> u64 { |
166 | 131 | match size_info { |
167 | 131 | UploadSizeInfo::ExactSize(s) => s, |
168 | 0 | UploadSizeInfo::MaxSize(_) => match key { |
169 | 0 | StoreKey::Digest(d) => d.size_bytes(), |
170 | 0 | StoreKey::Str(_) => 0, |
171 | | }, |
172 | | } |
173 | 131 | } |
174 | | |
175 | 131 | fn register_in_flight_slow_write( |
176 | 131 | &self, |
177 | 131 | key: StoreKey<'_>, |
178 | 131 | size: u64, |
179 | 131 | ) -> InFlightSlowWriteGuard { |
180 | 131 | let owned = key.into_owned(); |
181 | 131 | self.in_flight_slow_writes |
182 | 131 | .lock() |
183 | 131 | .insert(owned.borrow().into_owned(), size); |
184 | 131 | InFlightSlowWriteGuard { |
185 | 131 | weak_store: self.weak_self.clone(), |
186 | 131 | key: Some(owned), |
187 | 131 | } |
188 | 131 | } |
189 | | |
190 | 0 | pub const fn fast_store(&self) -> &Store { |
191 | 0 | &self.fast_store |
192 | 0 | } |
193 | | |
194 | 0 | pub const fn slow_store(&self) -> &Store { |
195 | 0 | &self.slow_store |
196 | 0 | } |
197 | | |
198 | 2 | pub fn get_arc(&self) -> Option<Arc<Self>> { |
199 | 2 | self.weak_self.upgrade() |
200 | 2 | } |
201 | | |
202 | 35 | fn get_loader<'a>(&self, key: StoreKey<'a>) -> LoaderGuard<'a> { |
203 | | // Get a single loader instance that's used to populate the fast store |
204 | | // for this digest. If another request comes in then it's de-duplicated. |
205 | 35 | let mut is_leader = false; |
206 | 35 | let loader = match self |
207 | 35 | .populating_digests |
208 | 35 | .lock() |
209 | 35 | .entry(key.borrow().into_owned()) |
210 | | { |
211 | 16 | std::collections::hash_map::Entry::Occupied(occupied_entry) => { |
212 | 16 | occupied_entry.get().clone() |
213 | | } |
214 | 19 | std::collections::hash_map::Entry::Vacant(vacant_entry) => { |
215 | 19 | is_leader = true; |
216 | 19 | vacant_entry.insert(Arc::new(OnceCell::new())).clone() |
217 | | } |
218 | | }; |
219 | 35 | LoaderGuard { |
220 | 35 | weak_store: self.weak_self.clone(), |
221 | 35 | key, |
222 | 35 | loader: Some(loader), |
223 | 35 | is_leader, |
224 | 35 | } |
225 | 35 | } |
226 | | |
227 | 19 | async fn populate_and_maybe_stream( |
228 | 19 | self: Pin<&Self>, |
229 | 19 | key: StoreKey<'_>, |
230 | 19 | maybe_writer: Option<&mut DropCloserWriteHalf>, |
231 | 19 | offset: u64, |
232 | 19 | length: Option<u64>, |
233 | 19 | ) -> Result<(), Error> { |
234 | 19 | let reader_stream_size = if self |
235 | 19 | .slow_store |
236 | 19 | .inner_store(Some(key.borrow())) |
237 | 19 | .optimized_for(StoreOptimizations::LazyExistenceOnSync) |
238 | | { |
239 | 2 | trace!( |
240 | | %key, |
241 | 2 | store_name = %self.slow_store.inner_store(Some(key.borrow())).get_name(), |
242 | | "Skipping .has() check due to LazyExistenceOnSync optimization" |
243 | | ); |
244 | 2 | UploadSizeInfo::MaxSize(u64::MAX) |
245 | | } else { |
246 | 17 | UploadSizeInfo::ExactSize(self |
247 | 17 | .slow_store |
248 | 17 | .has(key.borrow()) |
249 | 17 | .await |
250 | 17 | .err_tip(|| "Failed to run has() on slow store")?0 |
251 | 17 | .ok_or_else(|| {0 |
252 | 0 | let err = make_err!( |
253 | 0 | Code::NotFound, |
254 | | "Object {} not found in either fast or slow store. \ |
255 | | If using multiple workers, ensure all workers share the same CAS storage path.", |
256 | 0 | key.as_str() |
257 | | ); |
258 | 0 | if let StoreKey::Digest(d) = key.borrow() { |
259 | 0 | err.with_context(ErrorContext::MissingDigest { |
260 | 0 | hash: d.packed_hash().to_string(), |
261 | 0 | size: d.size_bytes() as i64, |
262 | 0 | }) |
263 | | } else { |
264 | 0 | err |
265 | | } |
266 | 0 | })? |
267 | | ) |
268 | | }; |
269 | | |
270 | 19 | let send_range = offset..length.map_or(u64::MAX, |length| length9 + offset9 ); |
271 | 19 | let mut bytes_received: u64 = 0; |
272 | 19 | let mut counted_hit = false; |
273 | | |
274 | 19 | let (mut fast_tx, fast_rx) = make_buf_channel_pair(); |
275 | 19 | let (slow_tx, mut slow_rx) = make_buf_channel_pair(); |
276 | 19 | let data_stream_fut = async move { |
277 | 19 | let mut maybe_writer_pin = maybe_writer.map(Pin::new); |
278 | | loop { |
279 | 36 | let output_buf35 = slow_rx |
280 | 36 | .recv() |
281 | 36 | .await |
282 | 36 | .err_tip(|| "Failed to read data data buffer from slow store")?1 ; |
283 | 35 | if output_buf.is_empty() { |
284 | | // Write out our EOF. |
285 | | // We are dropped as soon as we send_eof to writer_pin, so |
286 | | // we wait until we've finished all of our joins to do that. |
287 | 18 | let fast_res = fast_tx.send_eof(); |
288 | 18 | return Ok::<_, Error>((fast_res, maybe_writer_pin)); |
289 | 17 | } |
290 | | |
291 | 17 | if !counted_hit { |
292 | 17 | self.metrics |
293 | 17 | .slow_store_hit_count |
294 | 17 | .fetch_add(1, Ordering::Acquire); |
295 | 17 | counted_hit = true; |
296 | 17 | }0 |
297 | | |
298 | 17 | let output_buf_len = u64::try_from(output_buf.len()) |
299 | 17 | .err_tip(|| "Could not output_buf.len() to u64")?0 ; |
300 | 17 | self.metrics |
301 | 17 | .slow_store_downloaded_bytes |
302 | 17 | .fetch_add(output_buf_len, Ordering::Acquire); |
303 | | |
304 | 17 | let writer_fut = Self::calculate_range( |
305 | 17 | &(bytes_received..bytes_received + output_buf_len), |
306 | 17 | &send_range, |
307 | 0 | )? |
308 | 17 | .zip(maybe_writer_pin.as_mut()) |
309 | 17 | .map_or_else( |
310 | 5 | || futures::future::ready(Ok(())).left_future(), |
311 | 12 | |(range, writer_pin)| writer_pin.send(output_buf.slice(range)).right_future(), |
312 | | ); |
313 | | |
314 | 17 | bytes_received += output_buf_len; |
315 | | |
316 | 17 | let (fast_tx_res, writer_res) = join!(fast_tx.send(output_buf), writer_fut); |
317 | 17 | fast_tx_res.err_tip(|| "Failed to write to fast store in fast_slow store")?0 ; |
318 | 17 | writer_res.err_tip(|| "Failed to write result to writer in fast_slow store")?0 ; |
319 | | } |
320 | 19 | }; |
321 | | |
322 | 19 | let slow_store_fut = self.slow_store.get(key.borrow(), slow_tx); |
323 | 19 | let fast_store_fut = self |
324 | 19 | .fast_store |
325 | 19 | .update(key.borrow(), fast_rx, reader_stream_size); |
326 | | |
327 | 19 | let (data_stream_res, slow_res, fast_res) = |
328 | 19 | join!(data_stream_fut, slow_store_fut, fast_store_fut); |
329 | 19 | match data_stream_res { |
330 | 18 | Ok((fast_eof_res, maybe_writer_pin)) => |
331 | | // Sending the EOF will drop us almost immediately in bytestream_server |
332 | | // so we perform it as the very last action in this method. |
333 | | { |
334 | 18 | fast_eof_res.merge(fast_res).merge(slow_res).merge( |
335 | 18 | if let Some(mut writer_pin13 ) = maybe_writer_pin { |
336 | 13 | writer_pin.send_eof() |
337 | | } else { |
338 | 5 | Ok(()) |
339 | | }, |
340 | | ) |
341 | | } |
342 | 1 | Err(err) => match slow_res { |
343 | 1 | Err(slow_err) if slow_err.code == Code::NotFound => Err(slow_err), |
344 | 0 | _ => fast_res.merge(slow_res).merge(Err(err)), |
345 | | }, |
346 | | } |
347 | 19 | } |
348 | | |
349 | | /// Ensure our fast store is populated. This should be kept as a low |
350 | | /// cost function. Since the data itself is shared and not copied it should be fairly |
351 | | /// low cost to just discard the data, but does cost a few mutex locks while |
352 | | /// streaming. |
353 | 12 | pub async fn populate_fast_store(&self, key: StoreKey<'_>) -> Result<(), Error> { |
354 | 12 | let maybe_size_info = self |
355 | 12 | .fast_store |
356 | 12 | .has(key.borrow()) |
357 | 12 | .await |
358 | 12 | .err_tip(|| "While querying in populate_fast_store")?0 ; |
359 | 12 | if maybe_size_info.is_some() { |
360 | 7 | return Ok(()); |
361 | 5 | } |
362 | | |
363 | | // If the fast store is noop or read only or update only then this is an error. |
364 | 5 | if self |
365 | 5 | .fast_store |
366 | 5 | .inner_store(Some(key.borrow())) |
367 | 5 | .optimized_for(StoreOptimizations::NoopUpdates) |
368 | 5 | || self.fast_direction == StoreDirection::ReadOnly |
369 | 5 | || self.fast_direction == StoreDirection::Update |
370 | | { |
371 | 0 | return Err(make_err!( |
372 | 0 | Code::Internal, |
373 | 0 | "Attempt to populate fast store that is read only or noop" |
374 | 0 | )); |
375 | 5 | } |
376 | | |
377 | 5 | self.get_loader(key.borrow()) |
378 | 5 | .get_or_try_init(|| { |
379 | 5 | Pin::new(self).populate_and_maybe_stream(key.borrow(), None, 0, None) |
380 | 5 | }) |
381 | 5 | .await |
382 | 5 | .err_tip(|| "Failed to populate()") |
383 | 12 | } |
384 | | |
385 | | /// Returns the range of bytes that should be sent given a slice bounds |
386 | | /// offset so the output range maps the `received_range.start` to 0. |
387 | | // TODO(palfrey) This should be put into utils, as this logic is used |
388 | | // elsewhere in the code. |
389 | 27 | pub fn calculate_range( |
390 | 27 | received_range: &Range<u64>, |
391 | 27 | send_range: &Range<u64>, |
392 | 27 | ) -> Result<Option<Range<usize>>, Error> { |
393 | | // Protect against subtraction overflow. |
394 | 27 | if received_range.start >= received_range.end { |
395 | 0 | return Ok(None); |
396 | 27 | } |
397 | | |
398 | 27 | let start = max(received_range.start, send_range.start); |
399 | 27 | let end = min(received_range.end, send_range.end); |
400 | 27 | if received_range.contains(&start) && received_range25 .contains25 (&(end - 1)25 ) { |
401 | | // Offset both to the start of the received_range. |
402 | 23 | let calculated_range_start = usize::try_from(start - received_range.start) |
403 | 23 | .err_tip(|| "Could not convert (start - received_range.start) to usize")?0 ; |
404 | 23 | let calculated_range_end = usize::try_from(end - received_range.start) |
405 | 23 | .err_tip(|| "Could not convert (end - received_range.start) to usize")?0 ; |
406 | 23 | Ok(Some(calculated_range_start..calculated_range_end)) |
407 | | } else { |
408 | 4 | Ok(None) |
409 | | } |
410 | 27 | } |
411 | | } |
412 | | |
413 | | #[async_trait] |
414 | | impl StoreDriver for FastSlowStore { |
415 | | async fn has_with_results( |
416 | | self: Pin<&Self>, |
417 | | key: &[StoreKey<'_>], |
418 | | results: &mut [Option<u64>], |
419 | 24 | ) -> Result<(), Error> { |
420 | | // If our slow store is a noop store, it'll always return a 404, |
421 | | // so only check the fast store in such case. |
422 | | let slow_store = self.slow_store.inner_store::<StoreKey<'_>>(None); |
423 | | if slow_store.optimized_for(StoreOptimizations::NoopDownloads) { |
424 | | return self.fast_store.has_with_results(key, results).await; |
425 | | } |
426 | | // Primary lookup is the slow store because that's authoritative for |
427 | | // downstream consumers that fetch from there. But the slow store |
428 | | // alone can miss two important cases: |
429 | | // 1. A concurrent writer's slow-store write is still in flight. |
430 | | // 2. The blob is present in the fast (local) store — either |
431 | | // fast-only by configuration, or because the slow write has |
432 | | // not yet started/completed. |
433 | | // Reporting NotFound in those cases causes redundant duplicate |
434 | | // uploads or unnecessary slow-store fetches. |
435 | | self.slow_store.has_with_results(key, results).await?; |
436 | | |
437 | | // Fill in any blobs whose slow-store write is currently in flight. |
438 | | // Cheap when the map is empty (the common case). |
439 | | { |
440 | | let in_flight = self.in_flight_slow_writes.lock(); |
441 | | if !in_flight.is_empty() { |
442 | | for (k, result) in key.iter().zip(results.iter_mut()) { |
443 | | if result.is_none() { |
444 | | let owned = k.borrow().into_owned(); |
445 | | if let Some(size) = in_flight.get(&owned) { |
446 | | *result = Some(*size); |
447 | | } |
448 | | } |
449 | | } |
450 | | } |
451 | | } |
452 | | |
453 | | // Fall back to the fast store for anything still missing. This |
454 | | // covers fast-only writes and the brief window between fast-store |
455 | | // insertion and slow-store write start. |
456 | | let missing_indices: Vec<usize> = results |
457 | | .iter() |
458 | | .enumerate() |
459 | 25 | .filter_map(|(i, r)| if r.is_none() { Some(i)16 } else { None9 }) |
460 | | .collect(); |
461 | | if !missing_indices.is_empty() { |
462 | | let missing_keys: Vec<StoreKey<'_>> = |
463 | 16 | missing_indices.iter().map(|&i| key[i].borrow()).collect(); |
464 | | let mut fast_results = vec![None; missing_keys.len()]; |
465 | | self.fast_store |
466 | | .has_with_results(&missing_keys, &mut fast_results) |
467 | | .await?; |
468 | | for (j, &orig_idx) in missing_indices.iter().enumerate() { |
469 | | if fast_results[j].is_some() { |
470 | | results[orig_idx] = fast_results[j]; |
471 | | } |
472 | | } |
473 | | } |
474 | | Ok(()) |
475 | 24 | } |
476 | | |
477 | | async fn update( |
478 | | self: Pin<&Self>, |
479 | | key: StoreKey<'_>, |
480 | | mut reader: DropCloserReadHalf, |
481 | | size_info: UploadSizeInfo, |
482 | 123 | ) -> Result<(), Error> { |
483 | | // If either one of our stores is a noop store, bypass the multiplexing |
484 | | // and just use the store that is not a noop store. |
485 | | let ignore_slow = self |
486 | | .slow_store |
487 | | .inner_store(Some(key.borrow())) |
488 | | .optimized_for(StoreOptimizations::NoopUpdates) |
489 | | || self.slow_direction == StoreDirection::ReadOnly |
490 | | || self.slow_direction == StoreDirection::Get; |
491 | | let ignore_fast = self |
492 | | .fast_store |
493 | | .inner_store(Some(key.borrow())) |
494 | | .optimized_for(StoreOptimizations::NoopUpdates) |
495 | | || self.fast_direction == StoreDirection::ReadOnly |
496 | | || self.fast_direction == StoreDirection::Get; |
497 | | if ignore_slow && ignore_fast { |
498 | | // We need to drain the reader to avoid the writer complaining that we dropped |
499 | | // the connection prematurely. |
500 | | reader |
501 | | .drain() |
502 | | .await |
503 | | .err_tip(|| "In FastFlowStore::update")?; |
504 | | return Ok(()); |
505 | | } |
506 | | if ignore_slow { |
507 | | return self.fast_store.update(key, reader, size_info).await; |
508 | | } |
509 | | if ignore_fast { |
510 | | let _guard = |
511 | | self.register_in_flight_slow_write(key.borrow(), Self::track_size(&key, size_info)); |
512 | | return self.slow_store.update(key, reader, size_info).await; |
513 | | } |
514 | | |
515 | | let _slow_in_flight_guard = |
516 | | self.register_in_flight_slow_write(key.borrow(), Self::track_size(&key, size_info)); |
517 | | |
518 | | let (mut fast_tx, fast_rx) = make_buf_channel_pair(); |
519 | | let (mut slow_tx, slow_rx) = make_buf_channel_pair(); |
520 | | |
521 | | let key_debug = format!("{key:?}"); |
522 | | trace!( |
523 | | key = %key_debug, |
524 | | "FastSlowStore::update: starting dual-store upload", |
525 | | ); |
526 | | let update_start = std::time::Instant::now(); |
527 | | let mut bytes_sent: u64 = 0; |
528 | | |
529 | 117 | let data_stream_fut = async move { |
530 | | loop { |
531 | 189 | let buffer = reader |
532 | 189 | .recv() |
533 | 189 | .await |
534 | 189 | .err_tip(|| "Failed to read buffer in fastslow store")?0 ; |
535 | 189 | if buffer.is_empty() { |
536 | | // EOF received. |
537 | 117 | fast_tx.send_eof().err_tip( |
538 | | || "Failed to write eof to fast store in fast_slow store update", |
539 | 0 | )?; |
540 | 117 | slow_tx |
541 | 117 | .send_eof() |
542 | 117 | .err_tip(|| "Failed to write eof to writer in fast_slow store update")?0 ; |
543 | 117 | debug!( |
544 | | total_bytes = bytes_sent, |
545 | | "FastSlowStore::update: data_stream sent EOF to both stores", |
546 | | ); |
547 | 117 | return Result::<(), Error>::Ok(()); |
548 | 72 | } |
549 | | |
550 | 72 | let chunk_len = buffer.len(); |
551 | 72 | let send_start = std::time::Instant::now(); |
552 | 72 | let (fast_result, slow_result) = |
553 | 72 | join!(fast_tx.send(buffer.clone()), slow_tx.send(buffer)); |
554 | 72 | let send_elapsed = send_start.elapsed(); |
555 | 72 | if send_elapsed.as_secs() >= 5 { |
556 | 0 | warn!( |
557 | | chunk_len, |
558 | 0 | send_elapsed_ms = send_elapsed.as_millis(), |
559 | | total_bytes = bytes_sent, |
560 | | "FastSlowStore::update: channel send stalled (>5s). A downstream store may be hanging", |
561 | | ); |
562 | 72 | } |
563 | 72 | bytes_sent += u64::try_from(chunk_len).unwrap_or(u64::MAX); |
564 | 72 | fast_result |
565 | 72 | .map_err(|e| {0 |
566 | 0 | make_err!( |
567 | 0 | Code::Internal, |
568 | | "Failed to send message to fast_store in fast_slow_store {:?}", |
569 | | e |
570 | | ) |
571 | 0 | }) |
572 | 72 | .merge(slow_result.map_err(|e| {0 |
573 | 0 | make_err!( |
574 | 0 | Code::Internal, |
575 | | "Failed to send message to slow_store in fast_slow store {:?}", |
576 | | e |
577 | | ) |
578 | 0 | }))?; |
579 | | } |
580 | 117 | }; |
581 | | |
582 | | let fast_store_fut = self.fast_store.update(key.borrow(), fast_rx, size_info); |
583 | | let slow_store_fut = self.slow_store.update(key.borrow(), slow_rx, size_info); |
584 | | |
585 | | let (data_stream_res, fast_res, slow_res) = |
586 | | join!(data_stream_fut, fast_store_fut, slow_store_fut); |
587 | | |
588 | | let total_elapsed = update_start.elapsed(); |
589 | | if data_stream_res.is_err() || fast_res.is_err() || slow_res.is_err() { |
590 | | let all_not_found = [&data_stream_res, &fast_res, &slow_res] |
591 | | .iter() |
592 | 0 | .all(|r| match r { |
593 | 0 | Ok(()) => true, |
594 | 0 | Err(e) => e.code == Code::NotFound, |
595 | 0 | }); |
596 | | if all_not_found { |
597 | | info!( |
598 | | key = %key_debug, |
599 | | elapsed_ms = total_elapsed.as_millis(), |
600 | | data_stream_ok = data_stream_res.is_ok(), |
601 | | fast_store_ok = fast_res.is_ok(), |
602 | | slow_store_ok = slow_res.is_ok(), |
603 | | "FastSlowStore::update: completed with NotFound error(s)", |
604 | | ); |
605 | | } else { |
606 | | warn!( |
607 | | key = %key_debug, |
608 | | elapsed_ms = total_elapsed.as_millis(), |
609 | | data_stream_ok = data_stream_res.is_ok(), |
610 | | fast_store_ok = fast_res.is_ok(), |
611 | | slow_store_ok = slow_res.is_ok(), |
612 | | "FastSlowStore::update: completed with error(s)", |
613 | | ); |
614 | | } |
615 | | } else { |
616 | | trace!( |
617 | | key = %key_debug, |
618 | | elapsed_ms = total_elapsed.as_millis(), |
619 | | "FastSlowStore::update: completed successfully", |
620 | | ); |
621 | | } |
622 | | data_stream_res.merge(fast_res).merge(slow_res)?; |
623 | | Ok(()) |
624 | 123 | } |
625 | | |
626 | | /// `FastSlowStore` has optimizations for dealing with files. |
627 | 0 | fn optimized_for(&self, optimization: StoreOptimizations) -> bool { |
628 | 0 | optimization == StoreOptimizations::FileUpdates |
629 | 0 | } |
630 | | |
631 | | /// Optimized variation to consume the file if one of the stores is a |
632 | | /// filesystem store. This makes the operation a move instead of a copy |
633 | | /// dramatically increasing performance for large files. |
634 | | async fn update_with_whole_file( |
635 | | self: Pin<&Self>, |
636 | | key: StoreKey<'_>, |
637 | | path: OsString, |
638 | | mut file: fs::FileSlot, |
639 | | upload_size: UploadSizeInfo, |
640 | 10 | ) -> Result<Option<fs::FileSlot>, Error> { |
641 | | trace!( |
642 | | key = ?key, |
643 | | ?upload_size, |
644 | | "FastSlowStore::update_with_whole_file: starting", |
645 | | ); |
646 | | if self |
647 | | .fast_store |
648 | | .optimized_for(StoreOptimizations::FileUpdates) |
649 | | { |
650 | | if !self |
651 | | .slow_store |
652 | | .inner_store(Some(key.borrow())) |
653 | | .optimized_for(StoreOptimizations::NoopUpdates) |
654 | | && self.slow_direction != StoreDirection::ReadOnly |
655 | | && self.slow_direction != StoreDirection::Get |
656 | | { |
657 | | trace!("FastSlowStore::update_with_whole_file: uploading to slow_store"); |
658 | | let slow_start = std::time::Instant::now(); |
659 | | let _guard = self.register_in_flight_slow_write( |
660 | | key.borrow(), |
661 | | Self::track_size(&key, upload_size), |
662 | | ); |
663 | | slow_update_store_with_file( |
664 | | self.slow_store.as_store_driver_pin(), |
665 | | key.borrow(), |
666 | | &mut file, |
667 | | upload_size, |
668 | | ) |
669 | | .await |
670 | | .err_tip(|| "In FastSlowStore::update_with_whole_file slow_store")?; |
671 | | trace!( |
672 | | elapsed_ms = slow_start.elapsed().as_millis(), |
673 | | "FastSlowStore::update_with_whole_file: slow_store upload completed", |
674 | | ); |
675 | | } |
676 | | if self.fast_direction == StoreDirection::ReadOnly |
677 | | || self.fast_direction == StoreDirection::Get |
678 | | { |
679 | | return Ok(Some(file)); |
680 | | } |
681 | | return self |
682 | | .fast_store |
683 | | .update_with_whole_file(key, path, file, upload_size) |
684 | | .await; |
685 | | } |
686 | | |
687 | | if self |
688 | | .slow_store |
689 | | .optimized_for(StoreOptimizations::FileUpdates) |
690 | | { |
691 | | let ignore_fast = self |
692 | | .fast_store |
693 | | .inner_store(Some(key.borrow())) |
694 | | .optimized_for(StoreOptimizations::NoopUpdates) |
695 | | || self.fast_direction == StoreDirection::ReadOnly |
696 | | || self.fast_direction == StoreDirection::Get; |
697 | | if !ignore_fast { |
698 | | slow_update_store_with_file( |
699 | | self.fast_store.as_store_driver_pin(), |
700 | | key.borrow(), |
701 | | &mut file, |
702 | | upload_size, |
703 | | ) |
704 | | .await |
705 | | .err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?; |
706 | | } |
707 | | let ignore_slow = self.slow_direction == StoreDirection::ReadOnly |
708 | | || self.slow_direction == StoreDirection::Get; |
709 | | if ignore_slow { |
710 | | return Ok(Some(file)); |
711 | | } |
712 | | let _guard = self |
713 | | .register_in_flight_slow_write(key.borrow(), Self::track_size(&key, upload_size)); |
714 | | return self |
715 | | .slow_store |
716 | | .update_with_whole_file(key, path, file, upload_size) |
717 | | .await; |
718 | | } |
719 | | |
720 | | slow_update_store_with_file(self, key, &mut file, upload_size) |
721 | | .await |
722 | | .err_tip(|| "In FastSlowStore::update_with_whole_file")?; |
723 | | Ok(Some(file)) |
724 | 10 | } |
725 | | |
726 | | async fn get_part( |
727 | | self: Pin<&Self>, |
728 | | key: StoreKey<'_>, |
729 | | writer: &mut DropCloserWriteHalf, |
730 | | offset: u64, |
731 | | length: Option<u64>, |
732 | 124 | ) -> Result<(), Error> { |
733 | | // TODO(palfrey) Investigate if we should maybe ignore errors here instead of |
734 | | // forwarding them up. |
735 | | if self.fast_store.has(key.borrow()).await?.is_some() { |
736 | | self.metrics |
737 | | .fast_store_hit_count |
738 | | .fetch_add(1, Ordering::Acquire); |
739 | | self.fast_store |
740 | | .get_part(key, writer.borrow_mut(), offset, length) |
741 | | .await?; |
742 | | self.metrics |
743 | | .fast_store_downloaded_bytes |
744 | | .fetch_add(writer.get_bytes_written(), Ordering::Acquire); |
745 | | return Ok(()); |
746 | | } |
747 | | |
748 | | // If the fast store is noop or read only or update only then bypass it. |
749 | | if self |
750 | | .fast_store |
751 | | .inner_store(Some(key.borrow())) |
752 | | .optimized_for(StoreOptimizations::NoopUpdates) |
753 | | || self.fast_direction == StoreDirection::ReadOnly |
754 | | || self.fast_direction == StoreDirection::Update |
755 | | { |
756 | | self.metrics |
757 | | .slow_store_hit_count |
758 | | .fetch_add(1, Ordering::Acquire); |
759 | | self.slow_store |
760 | | .get_part(key, writer.borrow_mut(), offset, length) |
761 | | .await?; |
762 | | self.metrics |
763 | | .slow_store_downloaded_bytes |
764 | | .fetch_add(writer.get_bytes_written(), Ordering::Acquire); |
765 | | return Ok(()); |
766 | | } |
767 | | |
768 | | let mut writer = Some(writer); |
769 | | |
770 | | // Drive the dedup loader. Two distinct paths: |
771 | | // |
772 | | // * Leader (created the OnceCell entry): runs `populate` with |
773 | | // OUR `writer`, streaming directly to the caller while |
774 | | // filling the fast cache. No timeout: a multi-GB blob |
775 | | // legitimately takes minutes to stream, and cancelling our |
776 | | // own populate would propagate the failure to every other |
777 | | // reader of this digest. |
778 | | // |
779 | | // * Follower: bound the wait so a wedged leader does not pin |
780 | | // us until the upstream `gRPC` deadline fires. The follower |
781 | | // closure passes `None` for `writer`. This is critical: if |
782 | | // the OnceCell ever promotes our follower closure to leader |
783 | | // (because the original leader's future was dropped), and |
784 | | // our `tokio::time::timeout` then cancels it, *no* caller |
785 | | // `writer` was ever moved into the populate stream, so no |
786 | | // `gRPC` sender is dropped without EOF. The follower then |
787 | | // re-enters `get_part` below and reads from the now-warm |
788 | | // fast cache, OR falls back to the slow store on timeout. |
789 | | let needs_slow_store_fallback: bool = { |
790 | | let loader_guard = self.get_loader(key.borrow()); |
791 | | let is_leader = loader_guard.is_leader; |
792 | | if is_leader { |
793 | | loader_guard |
794 | 14 | .get_or_try_init(|| { |
795 | 14 | self.populate_and_maybe_stream(key.borrow(), writer.take(), offset, length) |
796 | 14 | }) |
797 | | .await?; |
798 | | false |
799 | | } else { |
800 | 0 | let load_fut = loader_guard.get_or_try_init(|| { |
801 | 0 | self.populate_and_maybe_stream(key.borrow(), None, offset, length) |
802 | 0 | }); |
803 | | match tokio::time::timeout(LEADER_WAIT_TIMEOUT, load_fut).await { |
804 | | Ok(result) => { |
805 | | result?; |
806 | | false |
807 | | } |
808 | | Err(_elapsed) => { |
809 | | self.metrics |
810 | | .leader_wait_timeouts |
811 | | .fetch_add(1, Ordering::Acquire); |
812 | | warn!( |
813 | | %key, |
814 | | timeout_secs = LEADER_WAIT_TIMEOUT.as_secs(), |
815 | | "FastSlowStore::get_part: leader-wait exceeded timeout, bypassing dedup and reading slow store directly", |
816 | | ); |
817 | | true |
818 | | } |
819 | | } |
820 | | } |
821 | | }; |
822 | | |
823 | | if needs_slow_store_fallback && let Some(writer) = writer.take() { |
824 | | return self |
825 | | .slow_store |
826 | | .get_part(key, writer, offset, length) |
827 | | .await |
828 | | .err_tip( |
829 | | || "In FastSlowStore::get_part slow_store fallback after leader-wait timeout", |
830 | | ); |
831 | | } |
832 | | |
833 | | // If we didn't stream then re-enter which will stream from the fast |
834 | | // store, or retry the download. We should not get in a loop here |
835 | | // because OnceCell has the good sense to retry for all callers so in |
836 | | // order to get here the fast store will have been populated. There's |
837 | | // an outside chance it was evicted, but that's slim. |
838 | | if let Some(writer) = writer.take() { |
839 | | self.get_part(key, writer, offset, length).await |
840 | | } else { |
841 | | // This was the thread that did the streaming already, lucky duck. |
842 | | Ok(()) |
843 | | } |
844 | 124 | } |
845 | | |
846 | 2 | fn inner_store(&self, _key: Option<StoreKey>) -> &dyn StoreDriver { |
847 | 2 | self |
848 | 2 | } |
849 | | |
850 | 2 | fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) { |
851 | 2 | self |
852 | 2 | } |
853 | | |
854 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> { |
855 | 0 | self |
856 | 0 | } |
857 | | |
858 | 0 | fn register_remove_callback( |
859 | 0 | self: Arc<Self>, |
860 | 0 | callback: Arc<dyn RemoveItemCallback>, |
861 | 0 | ) -> Result<(), Error> { |
862 | 0 | self.fast_store.register_remove_callback(callback.clone())?; |
863 | 0 | self.slow_store.register_remove_callback(callback)?; |
864 | 0 | Ok(()) |
865 | 0 | } |
866 | | } |
867 | | |
868 | | #[derive(Debug, Default, MetricsComponent)] |
869 | | struct FastSlowStoreMetrics { |
870 | | #[metric(help = "Hit count for the fast store")] |
871 | | fast_store_hit_count: AtomicU64, |
872 | | #[metric(help = "Downloaded bytes from the fast store")] |
873 | | fast_store_downloaded_bytes: AtomicU64, |
874 | | #[metric(help = "Hit count for the slow store")] |
875 | | slow_store_hit_count: AtomicU64, |
876 | | #[metric(help = "Downloaded bytes from the slow store")] |
877 | | slow_store_downloaded_bytes: AtomicU64, |
878 | | #[metric( |
879 | | help = "Number of times a follower bypassed the populating-digests dedup because the leader exceeded LEADER_WAIT_TIMEOUT" |
880 | | )] |
881 | | leader_wait_timeouts: AtomicU64, |
882 | | } |
883 | | |
884 | | /// Maximum time a follower will wait on the leader-populator before |
885 | | /// bypassing the dedup map and reading directly from the slow store. |
886 | | /// |
887 | | /// Without this bound a single wedged populator would block every |
888 | | /// concurrent reader of the same digest until each one's own `gRPC` |
889 | | /// deadline fired (e.g. Bazel's `--remote_timeout`), turning a |
890 | | /// single slow read into a fan-out of `DEADLINE_EXCEEDED` errors. |
891 | | const LEADER_WAIT_TIMEOUT: Duration = Duration::from_secs(60); |
892 | | |
893 | | default_health_status_indicator!(FastSlowStore); |