Coverage Report

Created: 2025-12-16 15:31

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/store_trait.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::borrow::{Borrow, BorrowMut};
16
use core::convert::Into;
17
use core::fmt::{self, Debug, Display};
18
use core::hash::{Hash, Hasher};
19
use core::ops::{Bound, RangeBounds};
20
use core::pin::Pin;
21
use core::ptr::addr_eq;
22
use std::borrow::Cow;
23
use std::collections::hash_map::DefaultHasher as StdHasher;
24
use std::ffi::OsString;
25
use std::sync::{Arc, OnceLock};
26
27
use async_trait::async_trait;
28
use bytes::{Bytes, BytesMut};
29
use futures::{Future, FutureExt, Stream, join, try_join};
30
use nativelink_error::{Code, Error, ResultExt, error_if, make_err};
31
use nativelink_metric::MetricsComponent;
32
use rand::rngs::StdRng;
33
use rand::{RngCore, SeedableRng};
34
use serde::{Deserialize, Serialize};
35
use tokio::io::{AsyncReadExt, AsyncSeekExt};
36
use tracing::warn;
37
38
use crate::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair};
39
use crate::common::DigestInfo;
40
use crate::digest_hasher::{DigestHasher, DigestHasherFunc, default_digest_hasher_func};
41
use crate::fs;
42
use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
43
44
static DEFAULT_DIGEST_SIZE_HEALTH_CHECK: OnceLock<usize> = OnceLock::new();
45
/// Default digest size for health check data. Any change in this value
46
/// changes the default contract. `GlobalConfig` should be updated to reflect
47
/// changes in this value.
48
pub const DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG: usize = 1024 * 1024;
49
50
// Get the default digest size for health check data, if value is unset a system wide default is used.
51
1
pub fn default_digest_size_health_check() -> usize {
52
1
    *DEFAULT_DIGEST_SIZE_HEALTH_CHECK.get_or_init(|| DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG)
53
1
}
54
55
/// Set the default digest size for health check data, this should be called once.
56
0
pub fn set_default_digest_size_health_check(size: usize) -> Result<(), Error> {
57
0
    DEFAULT_DIGEST_SIZE_HEALTH_CHECK.set(size).map_err(|_| {
58
0
        make_err!(
59
0
            Code::Internal,
60
            "set_default_digest_size_health_check already set"
61
        )
62
0
    })
63
0
}
64
65
#[derive(Debug, PartialEq, Eq, Copy, Clone, Serialize, Deserialize)]
66
pub enum UploadSizeInfo {
67
    /// When the data transfer amount is known to be exact size, this enum should be used.
68
    /// The receiver store can use this to better optimize the way the data is sent or stored.
69
    ExactSize(u64),
70
71
    /// When the data transfer amount is not known to be exact, the caller should use this enum
72
    /// to provide the maximum size that could possibly be sent. This will bypass the exact size
73
    /// checks, but still provide useful information to the underlying store about the data being
74
    /// sent that it can then use to optimize the upload process.
75
    MaxSize(u64),
76
}
77
78
/// Utility to send all the data to the store from a file.
79
// Note: This is not inlined because some code may want to bypass any underlying
80
// optimizations that may be present in the inner store.
81
6
pub async fn slow_update_store_with_file<S: StoreDriver + ?Sized>(
82
6
    store: Pin<&S>,
83
6
    digest: impl Into<StoreKey<'_>>,
84
6
    file: &mut fs::FileSlot,
85
6
    upload_size: UploadSizeInfo,
86
6
) -> Result<(), Error> {
87
6
    file.rewind()
88
6
        .await
89
6
        .err_tip(|| "Failed to rewind in upload_file_to_store")
?0
;
90
6
    let (mut tx, rx) = make_buf_channel_pair();
91
92
6
    let update_fut = store
93
6
        .update(digest.into(), rx, upload_size)
94
6
        .map(|r| r.err_tip(|| "Could not upload data to store in upload_file_to_store"));
95
6
    let read_data_fut = async move {
96
        loop {
97
75
            let mut buf = BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE);
98
75
            let read = file
99
75
                .read_buf(&mut buf)
100
75
                .await
101
75
                .err_tip(|| "Failed to read in upload_file_to_store")
?0
;
102
75
            if read == 0 {
  Branch (102:16): [True: 5, False: 5]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [Folded - Ignored]
  Branch (102:16): [True: 1, False: 64]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [Folded - Ignored]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
103
6
                break;
104
69
            }
105
69
            tx.send(buf.freeze())
106
69
                .await
107
69
                .err_tip(|| "Failed to send in upload_file_to_store")
?0
;
108
        }
109
6
        tx.send_eof()
110
6
            .err_tip(|| "Could not send EOF to store in upload_file_to_store")
111
6
    };
112
6
    tokio::pin!(read_data_fut);
113
6
    let (update_res, read_res) = tokio::join!(update_fut, read_data_fut);
114
6
    update_res.merge(read_res)
115
6
}
116
117
/// Optimizations that stores may want to expose to the callers.
118
/// This is useful for specific cases when the store can optimize the processing
119
/// of the data being processed.
120
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
121
pub enum StoreOptimizations {
122
    /// The store can optimize the upload process when it knows the data is coming from a file.
123
    FileUpdates,
124
125
    /// If the store will ignore the data uploads.
126
    NoopUpdates,
127
128
    /// If the store will never serve downloads.
129
    NoopDownloads,
130
131
    /// If the store will determine whether a key has associated data once a read has been
132
    /// attempted instead of calling `.has()` first.
133
    LazyExistenceOnSync,
134
135
    /// The store provides an optimized `update_oneshot` implementation that bypasses
136
    /// channel overhead for direct Bytes writes. Stores with this optimization can
137
    /// accept complete data directly without going through the MPSC channel.
138
    SubscribesToUpdateOneshot,
139
}
140
141
/// A wrapper struct for [`StoreKey`] to work around
142
/// lifetime limitations in `HashMap::get()` as described in
143
/// <https://github.com/rust-lang/rust/issues/80389>
144
///
145
/// As such this is a wrapper type that is stored in the
146
/// maps using the workaround as described in
147
/// <https://blinsay.com/blog/compound-keys/>
148
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
149
#[repr(transparent)]
150
pub struct StoreKeyBorrow(StoreKey<'static>);
151
152
impl From<StoreKey<'static>> for StoreKeyBorrow {
153
5.85k
    fn from(key: StoreKey<'static>) -> Self {
154
5.85k
        Self(key)
155
5.85k
    }
156
}
157
158
impl From<StoreKeyBorrow> for StoreKey<'static> {
159
0
    fn from(key_borrow: StoreKeyBorrow) -> Self {
160
0
        key_borrow.0
161
0
    }
162
}
163
164
impl<'a> Borrow<StoreKey<'a>> for StoreKeyBorrow {
165
4.66k
    fn borrow(&self) -> &StoreKey<'a> {
166
4.66k
        &self.0
167
4.66k
    }
168
}
169
170
impl<'a> Borrow<StoreKey<'a>> for &StoreKeyBorrow {
171
0
    fn borrow(&self) -> &StoreKey<'a> {
172
0
        &self.0
173
0
    }
174
}
175
176
/// Holds something that can be converted into a key the
177
/// store API can understand. Generally this is a digest
178
/// but it can also be a string if the caller wishes to
179
/// store the data directly and reference it by a string
180
/// directly.
181
#[derive(Debug, Eq)]
182
pub enum StoreKey<'a> {
183
    /// A string key.
184
    Str(Cow<'a, str>),
185
186
    /// A key that is a digest.
187
    Digest(DigestInfo),
188
}
189
190
impl<'a> StoreKey<'a> {
191
    /// Creates a new store key from a string.
192
22
    pub const fn new_str(s: &'a str) -> Self {
193
22
        StoreKey::Str(Cow::Borrowed(s))
194
22
    }
195
196
    /// Returns a shallow clone of the key.
197
    /// This is extremely cheap and should be used when clone
198
    /// is needed but the key is not going to be modified.
199
    #[must_use]
200
    #[allow(
201
        clippy::missing_const_for_fn,
202
        reason = "False positive on stable, but not on nightly"
203
    )]
204
3.13k
    pub fn borrow(&'a self) -> Self {
205
54
        match self {
206
15
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Borrowed(s)),
207
39
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Borrowed(s)),
208
3.07k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
209
        }
210
3.13k
    }
211
212
    /// Converts the key into an owned version. This is useful
213
    /// when the caller needs an owned version of the key.
214
12.0k
    pub fn into_owned(self) -> StoreKey<'static> {
215
68
        match self {
216
14
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Owned(s)),
217
54
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Owned(s.to_owned())),
218
11.9k
            StoreKey::Digest(d) => StoreKey::Digest(d),
219
        }
220
12.0k
    }
221
222
    /// Converts the key into a digest. This is useful when the caller
223
    /// must have a digest key. If the data is not a digest, it may
224
    /// hash the underlying key and return a digest of the hash of the key
225
113
    pub fn into_digest(self) -> DigestInfo {
226
113
        match self {
227
111
            StoreKey::Digest(digest) => digest,
228
2
            StoreKey::Str(s) => {
229
2
                let mut hasher = DigestHasherFunc::Blake3.hasher();
230
2
                hasher.update(s.as_bytes());
231
2
                hasher.finalize_digest()
232
            }
233
        }
234
113
    }
235
236
    /// Returns the key as a string. If the key is a digest, it will
237
    /// return a string representation of the digest. If the key is a string,
238
    /// it will return the string itself.
239
160
    pub fn as_str(&'a self) -> Cow<'a, str> {
240
90
        match self {
241
90
            StoreKey::Str(Cow::Owned(s)) => Cow::Borrowed(s),
242
0
            StoreKey::Str(Cow::Borrowed(s)) => Cow::Borrowed(s),
243
70
            StoreKey::Digest(d) => Cow::Owned(format!("{d}")),
244
        }
245
160
    }
246
}
247
248
impl Clone for StoreKey<'static> {
249
10.2k
    fn clone(&self) -> Self {
250
10.2k
        match self {
251
27
            StoreKey::Str(s) => StoreKey::Str(s.clone()),
252
10.2k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
253
        }
254
10.2k
    }
255
}
256
257
impl PartialOrd for StoreKey<'_> {
258
36
    fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
259
36
        Some(self.cmp(other))
260
36
    }
261
}
262
263
impl Ord for StoreKey<'_> {
264
53
    fn cmp(&self, other: &Self) -> core::cmp::Ordering {
265
53
        match (self, other) {
266
53
            (StoreKey::Str(a), StoreKey::Str(b)) => a.cmp(b),
267
0
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a.cmp(b),
268
0
            (StoreKey::Str(_), StoreKey::Digest(_)) => core::cmp::Ordering::Less,
269
0
            (StoreKey::Digest(_), StoreKey::Str(_)) => core::cmp::Ordering::Greater,
270
        }
271
53
    }
272
}
273
274
impl PartialEq for StoreKey<'_> {
275
5.55k
    fn eq(&self, other: &Self) -> bool {
276
5.55k
        match (self, other) {
277
50
            (StoreKey::Str(a), StoreKey::Str(b)) => a == b,
278
5.50k
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a == b,
279
0
            _ => false,
280
        }
281
5.55k
    }
282
}
283
284
impl Hash for StoreKey<'_> {
285
25.2k
    fn hash<H: Hasher>(&self, state: &mut H) {
286
        /// Salts the hash with the enum value that represents
287
        /// the type of the key.
288
        #[repr(u8)]
289
        enum HashId {
290
            Str = 0,
291
            Digest = 1,
292
        }
293
25.2k
        match self {
294
42
            StoreKey::Str(s) => {
295
42
                (HashId::Str as u8).hash(state);
296
42
                s.hash(state);
297
42
            }
298
25.1k
            StoreKey::Digest(d) => {
299
25.1k
                (HashId::Digest as u8).hash(state);
300
25.1k
                d.hash(state);
301
25.1k
            }
302
        }
303
25.2k
    }
304
}
305
306
impl<'a> From<&'a str> for StoreKey<'a> {
307
1
    fn from(s: &'a str) -> Self {
308
1
        StoreKey::Str(Cow::Borrowed(s))
309
1
    }
310
}
311
312
impl From<String> for StoreKey<'static> {
313
0
    fn from(s: String) -> Self {
314
0
        StoreKey::Str(Cow::Owned(s))
315
0
    }
316
}
317
318
impl From<DigestInfo> for StoreKey<'_> {
319
10.5k
    fn from(d: DigestInfo) -> Self {
320
10.5k
        StoreKey::Digest(d)
321
10.5k
    }
322
}
323
324
impl From<&DigestInfo> for StoreKey<'_> {
325
52
    fn from(d: &DigestInfo) -> Self {
326
52
        StoreKey::Digest(*d)
327
52
    }
328
}
329
330
// mostly for use with tracing::Value
331
impl Display for StoreKey<'_> {
332
15
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
333
15
        match self {
334
13
            StoreKey::Str(s) => {
335
13
                write!(f, "{s}")
336
            }
337
2
            StoreKey::Digest(d) => {
338
2
                write!(f, "Digest: {d}")
339
            }
340
        }
341
15
    }
342
}
343
344
#[derive(Clone, MetricsComponent)]
345
#[repr(transparent)]
346
pub struct Store {
347
    #[metric]
348
    inner: Arc<dyn StoreDriver>,
349
}
350
351
impl Debug for Store {
352
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
353
0
        f.debug_struct("Store").finish_non_exhaustive()
354
0
    }
355
}
356
357
impl Store {
358
315
    pub fn new(inner: Arc<dyn StoreDriver>) -> Self {
359
315
        Self { inner }
360
315
    }
361
362
    /// Returns the immediate inner store driver.
363
    /// Note: This does not recursively try to resolve underlying store drivers
364
    /// like `.inner_store()` does.
365
    #[inline]
366
1
    pub fn into_inner(self) -> Arc<dyn StoreDriver> {
367
1
        self.inner
368
1
    }
369
370
    /// Gets the underlying store for the given digest.
371
    /// A caller might want to use this to obtain a reference to the "real" underlying store
372
    /// (if applicable) and check if it implements some special traits that allow optimizations.
373
    /// Note: If the store performs complex operations on the data, it should return itself.
374
    #[inline]
375
228
    pub fn inner_store<'a, K: Into<StoreKey<'a>>>(&self, digest: Option<K>) -> &dyn StoreDriver {
376
228
        self.inner.inner_store(digest.map(Into::into))
377
228
    }
378
379
    /// Tries to cast the underlying store to the given type.
380
    #[inline]
381
67
    pub fn downcast_ref<U: StoreDriver>(&self, maybe_digest: Option<StoreKey<'_>>) -> Option<&U> {
382
67
        self.inner.inner_store(maybe_digest).as_any().downcast_ref()
383
67
    }
384
385
    /// Register health checks used to monitor the store.
386
    #[inline]
387
0
    pub fn register_health(&self, registry: &mut HealthRegistryBuilder) {
388
0
        self.inner.clone().register_health(registry);
389
0
    }
390
391
    #[inline]
392
8
    pub fn register_remove_callback(
393
8
        &self,
394
8
        callback: Arc<dyn RemoveItemCallback>,
395
8
    ) -> Result<(), Error> {
396
8
        self.inner.clone().register_remove_callback(callback)
397
8
    }
398
}
399
400
impl StoreLike for Store {
401
    #[inline]
402
10.0k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
403
10.0k
        self.inner.as_ref()
404
10.0k
    }
405
406
7
    fn as_pin(&self) -> Pin<&Self> {
407
7
        Pin::new(self)
408
7
    }
409
}
410
411
impl<T> StoreLike for T
412
where
413
    T: StoreDriver + Sized,
414
{
415
    #[inline]
416
8.02k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
417
8.02k
        self
418
8.02k
    }
419
420
90
    fn as_pin(&self) -> Pin<&Self> {
421
90
        Pin::new(self)
422
90
    }
423
}
424
425
pub trait StoreLike: Send + Sync + Sized + Unpin + 'static {
426
    /// Returns the immediate inner store driver.
427
    fn as_store_driver(&self) -> &'_ dyn StoreDriver;
428
429
    /// Utility function to return a pinned reference to self.
430
    fn as_pin(&self) -> Pin<&Self>;
431
432
    /// Utility function to return a pinned reference to the store driver.
433
    #[inline]
434
18.1k
    fn as_store_driver_pin(&self) -> Pin<&'_ dyn StoreDriver> {
435
18.1k
        Pin::new(self.as_store_driver())
436
18.1k
    }
437
438
    /// Look up a digest in the store and return None if it does not exist in
439
    /// the store, or Some(size) if it does.
440
    /// Note: On an AC store the size will be incorrect and should not be used!
441
    #[inline]
442
255
    fn has<'a>(
443
255
        &'a self,
444
255
        digest: impl Into<StoreKey<'a>>,
445
255
    ) -> impl Future<Output = Result<Option<u64>, Error>> + 'a {
446
255
        self.as_store_driver_pin().has(digest.into())
447
255
    }
448
449
    /// Look up a list of digests in the store and return a result for each in
450
    /// the same order as input.  The result will either be None if it does not
451
    /// exist in the store, or Some(size) if it does.
452
    /// Note: On an AC store the size will be incorrect and should not be used!
453
    #[inline]
454
19
    fn has_many<'a>(
455
19
        &'a self,
456
19
        digests: &'a [StoreKey<'a>],
457
19
    ) -> impl Future<Output = Result<Vec<Option<u64>>, Error>> + Send + 'a {
458
19
        self.as_store_driver_pin().has_many(digests)
459
19
    }
460
461
    /// The implementation of the above has and `has_many` functions.  See their
462
    /// documentation for details.
463
    #[inline]
464
56
    fn has_with_results<'a>(
465
56
        &'a self,
466
56
        digests: &'a [StoreKey<'a>],
467
56
        results: &'a mut [Option<u64>],
468
56
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
469
56
        self.as_store_driver_pin()
470
56
            .has_with_results(digests, results)
471
56
    }
472
473
    /// List all the keys in the store that are within the given range.
474
    /// `handler` is called for each key in the range. If `handler` returns
475
    /// false, the listing is stopped.
476
    ///
477
    /// The number of keys passed through the handler is the return value.
478
    #[inline]
479
15
    fn list<'a, 'b>(
480
15
        &'a self,
481
15
        range: impl RangeBounds<StoreKey<'b>> + Send + 'b,
482
15
        mut handler: impl for<'c> FnMut(&'c StoreKey) -> bool + Send + Sync + 'a,
483
15
    ) -> impl Future<Output = Result<u64, Error>> + Send + 'a
484
15
    where
485
15
        'b: 'a,
486
    {
487
        // Note: We use a manual async move, so the future can own the `range` and `handler`,
488
        // otherwise we'd require the caller to pass them in by reference making more borrow
489
        // checker noise.
490
15
        async move {
491
15
            self.as_store_driver_pin()
492
15
                .list(
493
15
                    (
494
15
                        range.start_bound().map(StoreKey::borrow),
495
15
                        range.end_bound().map(StoreKey::borrow),
496
15
                    ),
497
15
                    &mut handler,
498
15
                )
499
15
                .await
500
15
        }
501
15
    }
502
503
    /// Sends the data to the store.
504
    #[inline]
505
5.24k
    fn update<'a>(
506
5.24k
        &'a self,
507
5.24k
        digest: impl Into<StoreKey<'a>>,
508
5.24k
        reader: DropCloserReadHalf,
509
5.24k
        upload_size: UploadSizeInfo,
510
5.24k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
511
5.24k
        self.as_store_driver_pin()
512
5.24k
            .update(digest.into(), reader, upload_size)
513
5.24k
    }
514
515
    /// Any optimizations the store might want to expose to the callers.
516
    /// By default, no optimizations are exposed.
517
    #[inline]
518
20
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
519
20
        self.as_store_driver_pin().optimized_for(optimization)
520
20
    }
521
522
    /// Specialized version of `.update()` which takes a `FileSlot`.
523
    /// This is useful if the underlying store can optimize the upload process
524
    /// when it knows the data is coming from a file.
525
    #[inline]
526
9
    fn update_with_whole_file<'a>(
527
9
        &'a self,
528
9
        digest: impl Into<StoreKey<'a>>,
529
9
        path: OsString,
530
9
        file: fs::FileSlot,
531
9
        upload_size: UploadSizeInfo,
532
9
    ) -> impl Future<Output = Result<Option<fs::FileSlot>, Error>> + Send + 'a {
533
9
        self.as_store_driver_pin()
534
9
            .update_with_whole_file(digest.into(), path, file, upload_size)
535
9
    }
536
537
    /// Utility to send all the data to the store when you have all the bytes.
538
    #[inline]
539
5.67k
    fn update_oneshot<'a>(
540
5.67k
        &'a self,
541
5.67k
        digest: impl Into<StoreKey<'a>>,
542
5.67k
        data: Bytes,
543
5.67k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
544
5.67k
        self.as_store_driver_pin()
545
5.67k
            .update_oneshot(digest.into(), data)
546
5.67k
    }
547
548
    /// Retrieves part of the data from the store and writes it to the given writer.
549
    #[inline]
550
1.31k
    fn get_part<'a>(
551
1.31k
        &'a self,
552
1.31k
        digest: impl Into<StoreKey<'a>>,
553
1.31k
        mut writer: impl BorrowMut<DropCloserWriteHalf> + Send + 'a,
554
1.31k
        offset: u64,
555
1.31k
        length: Option<u64>,
556
1.31k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
557
1.31k
        let key = digest.into();
558
        // Note: We need to capture `writer` just in case the caller
559
        // expects the drop() method to be called on it when the future
560
        // is done due to the complex interaction between the DropCloserWriteHalf
561
        // and the DropCloserReadHalf during drop().
562
1.31k
        async move {
563
1.31k
            self.as_store_driver_pin()
564
1.31k
                .get_part(key, writer.borrow_mut(), offset, length)
565
1.31k
                .await
566
1.31k
        }
567
1.31k
    }
568
569
    /// Utility that works the same as `.get_part()`, but writes all the data.
570
    #[inline]
571
17
    fn get<'a>(
572
17
        &'a self,
573
17
        key: impl Into<StoreKey<'a>>,
574
17
        writer: DropCloserWriteHalf,
575
17
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
576
17
        self.as_store_driver_pin().get(key.into(), writer)
577
17
    }
578
579
    /// Utility that will return all the bytes at once instead of in a streaming manner.
580
    #[inline]
581
5.26k
    fn get_part_unchunked<'a>(
582
5.26k
        &'a self,
583
5.26k
        key: impl Into<StoreKey<'a>>,
584
5.26k
        offset: u64,
585
5.26k
        length: Option<u64>,
586
5.26k
    ) -> impl Future<Output = Result<Bytes, Error>> + Send + 'a {
587
5.26k
        self.as_store_driver_pin()
588
5.26k
            .get_part_unchunked(key.into(), offset, length)
589
5.26k
    }
590
591
    /// Default implementation of the health check. Some stores may want to override this
592
    /// in situations where the default implementation is not sufficient.
593
    #[inline]
594
1
    fn check_health(
595
1
        &self,
596
1
        namespace: Cow<'static, str>,
597
1
    ) -> impl Future<Output = HealthStatus> + Send {
598
1
        self.as_store_driver_pin().check_health(namespace)
599
1
    }
600
}
601
602
#[async_trait]
603
pub trait StoreDriver:
604
    Sync + Send + Unpin + MetricsComponent + HealthStatusIndicator + 'static
605
{
606
    /// See: [`StoreLike::has`] for details.
607
    #[inline]
608
278
    async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<u64>, Error> {
609
        let mut result = [None];
610
        self.has_with_results(&[key], &mut result).await?;
611
        Ok(result[0])
612
278
    }
613
614
    /// See: [`StoreLike::has_many`] for details.
615
    #[inline]
616
    async fn has_many(
617
        self: Pin<&Self>,
618
        digests: &[StoreKey<'_>],
619
19
    ) -> Result<Vec<Option<u64>>, Error> {
620
        let mut results = vec![None; digests.len()];
621
        self.has_with_results(digests, &mut results).await?;
622
        Ok(results)
623
19
    }
624
625
    /// See: [`StoreLike::has_with_results`] for details.
626
    async fn has_with_results(
627
        self: Pin<&Self>,
628
        digests: &[StoreKey<'_>],
629
        results: &mut [Option<u64>],
630
    ) -> Result<(), Error>;
631
632
    /// See: [`StoreLike::list`] for details.
633
    async fn list(
634
        self: Pin<&Self>,
635
        _range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
636
        _handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
637
0
    ) -> Result<u64, Error> {
638
        // TODO(palfrey) We should force all stores to implement this function instead of
639
        // providing a default implementation.
640
        Err(make_err!(
641
            Code::Unimplemented,
642
            "Store::list() not implemented for this store"
643
        ))
644
0
    }
645
646
    /// See: [`StoreLike::update`] for details.
647
    async fn update(
648
        self: Pin<&Self>,
649
        key: StoreKey<'_>,
650
        reader: DropCloserReadHalf,
651
        upload_size: UploadSizeInfo,
652
    ) -> Result<(), Error>;
653
654
    /// See: [`StoreLike::optimized_for`] for details.
655
2
    fn optimized_for(&self, _optimization: StoreOptimizations) -> bool {
656
2
        false
657
2
    }
658
659
    /// See: [`StoreLike::update_with_whole_file`] for details.
660
    async fn update_with_whole_file(
661
        self: Pin<&Self>,
662
        key: StoreKey<'_>,
663
        path: OsString,
664
        mut file: fs::FileSlot,
665
        upload_size: UploadSizeInfo,
666
1
    ) -> Result<Option<fs::FileSlot>, Error> {
667
        let inner_store = self.inner_store(Some(key.borrow()));
668
        if inner_store.optimized_for(StoreOptimizations::FileUpdates) {
669
            error_if!(
670
                addr_eq(inner_store, &raw const *self),
671
                "Store::inner_store() returned self when optimization present"
672
            );
673
            return Pin::new(inner_store)
674
                .update_with_whole_file(key, path, file, upload_size)
675
                .await;
676
        }
677
        slow_update_store_with_file(self, key, &mut file, upload_size).await?;
678
        Ok(Some(file))
679
1
    }
680
681
    /// See: [`StoreLike::update_oneshot`] for details.
682
5.13k
    async fn update_oneshot(self: Pin<&Self>, key: StoreKey<'_>, data: Bytes) -> Result<(), Error> {
683
        // TODO(palfrey) This is extremely inefficient, since we have exactly
684
        // what we need here. Maybe we could instead make a version of the stream
685
        // that can take objects already fully in memory instead?
686
        let (mut tx, rx) = make_buf_channel_pair();
687
688
        let data_len =
689
            u64::try_from(data.len()).err_tip(|| "Could not convert data.len() to u64")?;
690
5.13k
        let send_fut = async move {
691
            // Only send if we are not EOF.
692
5.13k
            if !data.is_empty() {
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [Folded - Ignored]
  Branch (692:16): [True: 8, False: 0]
  Branch (692:16): [True: 4, False: 1]
  Branch (692:16): [True: 7, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 3, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 5, False: 0]
  Branch (692:16): [True: 1, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [Folded - Ignored]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 0, False: 0]
  Branch (692:16): [True: 1, False: 0]
  Branch (692:16): [True: 4, False: 2]
  Branch (692:16): [True: 1, False: 0]
  Branch (692:16): [True: 51, False: 32]
  Branch (692:16): [True: 1, False: 0]
  Branch (692:16): [True: 5.00k, False: 0]
  Branch (692:16): [True: 2, False: 0]
  Branch (692:16): [True: 7, False: 0]
693
5.09k
                tx.send(data)
694
5.09k
                    .await
695
5.09k
                    .err_tip(|| "Failed to write data in update_oneshot")
?0
;
696
35
            }
697
5.13k
            tx.send_eof()
698
5.13k
                .err_tip(|| "Failed to write EOF in update_oneshot")
?0
;
699
5.13k
            Ok(())
700
5.13k
        };
701
        try_join!(
702
            send_fut,
703
            self.update(key, rx, UploadSizeInfo::ExactSize(data_len))
704
        )?;
705
        Ok(())
706
5.13k
    }
707
708
    /// See: [`StoreLike::get_part`] for details.
709
    async fn get_part(
710
        self: Pin<&Self>,
711
        key: StoreKey<'_>,
712
        writer: &mut DropCloserWriteHalf,
713
        offset: u64,
714
        length: Option<u64>,
715
    ) -> Result<(), Error>;
716
717
    /// See: [`StoreLike::get`] for details.
718
    #[inline]
719
    async fn get(
720
        self: Pin<&Self>,
721
        key: StoreKey<'_>,
722
        mut writer: DropCloserWriteHalf,
723
17
    ) -> Result<(), Error> {
724
        self.get_part(key, &mut writer, 0, None).await
725
17
    }
726
727
    /// See: [`StoreLike::get_part_unchunked`] for details.
728
    async fn get_part_unchunked(
729
        self: Pin<&Self>,
730
        key: StoreKey<'_>,
731
        offset: u64,
732
        length: Option<u64>,
733
5.37k
    ) -> Result<Bytes, Error> {
734
        let length_usize = length
735
2.24k
            .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize"))
736
            .transpose()?;
737
738
        // TODO(palfrey) This is extremely inefficient, since we have exactly
739
        // what we need here. Maybe we could instead make a version of the stream
740
        // that can take objects already fully in memory instead?
741
        let (mut tx, mut rx) = make_buf_channel_pair();
742
743
        let (data_res, get_part_res) = join!(
744
            rx.consume(length_usize),
745
            // We use a closure here to ensure that the `tx` is dropped when the
746
            // future is done.
747
5.37k
            async move { self.get_part(key, &mut tx, offset, length).await },
748
        );
749
        get_part_res
750
            .err_tip(|| "Failed to get_part in get_part_unchunked")
751
            .merge(data_res.err_tip(|| "Failed to read stream to completion in get_part_unchunked"))
752
5.37k
    }
753
754
    /// See: [`StoreLike::check_health`] for details.
755
1
    async fn check_health(self: Pin<&Self>, namespace: Cow<'static, str>) -> HealthStatus {
756
        let digest_data_size = default_digest_size_health_check();
757
        let mut digest_data = vec![0u8; digest_data_size];
758
759
        let mut namespace_hasher = StdHasher::new();
760
        namespace.hash(&mut namespace_hasher);
761
        self.get_name().hash(&mut namespace_hasher);
762
        let hash_seed = namespace_hasher.finish();
763
764
        // Fill the digest data with random data based on a stable
765
        // hash of the namespace and store name. Intention is to
766
        // have randomly filled data that is unique per store and
767
        // does not change between health checks. This is to ensure
768
        // we are not adding more data to store on each health check.
769
        let mut rng: StdRng = StdRng::seed_from_u64(hash_seed);
770
        rng.fill_bytes(&mut digest_data);
771
772
        let mut digest_hasher = default_digest_hasher_func().hasher();
773
        digest_hasher.update(&digest_data);
774
        let digest_data_len = digest_data.len() as u64;
775
        let digest_info = StoreKey::from(digest_hasher.finalize_digest());
776
777
        let digest_bytes = Bytes::copy_from_slice(&digest_data);
778
779
        if let Err(e) = self
780
            .update_oneshot(digest_info.borrow(), digest_bytes.clone())
781
            .await
782
        {
783
            warn!(?e, "check_health Store.update_oneshot() failed");
784
            return HealthStatus::new_failed(
785
                self.get_ref(),
786
                format!("Store.update_oneshot() failed: {e}").into(),
787
            );
788
        }
789
790
        match self.has(digest_info.borrow()).await {
791
            Ok(Some(s)) => {
792
                if s != digest_data_len {
793
                    return HealthStatus::new_failed(
794
                        self.get_ref(),
795
                        format!("Store.has() size mismatch {s} != {digest_data_len}").into(),
796
                    );
797
                }
798
            }
799
            Ok(None) => {
800
                return HealthStatus::new_failed(
801
                    self.get_ref(),
802
                    "Store.has() size not found".into(),
803
                );
804
            }
805
            Err(e) => {
806
                return HealthStatus::new_failed(
807
                    self.get_ref(),
808
                    format!("Store.has() failed: {e}").into(),
809
                );
810
            }
811
        }
812
813
        match self
814
            .get_part_unchunked(digest_info, 0, Some(digest_data_len))
815
            .await
816
        {
817
            Ok(b) => {
818
                if b != digest_bytes {
819
                    return HealthStatus::new_failed(
820
                        self.get_ref(),
821
                        "Store.get_part_unchunked() data mismatch".into(),
822
                    );
823
                }
824
            }
825
            Err(e) => {
826
                return HealthStatus::new_failed(
827
                    self.get_ref(),
828
                    format!("Store.get_part_unchunked() failed: {e}").into(),
829
                );
830
            }
831
        }
832
833
        HealthStatus::new_ok(self.get_ref(), "Successfully store health check".into())
834
1
    }
835
836
    /// See: [`Store::inner_store`] for details.
837
    fn inner_store(&self, _digest: Option<StoreKey<'_>>) -> &dyn StoreDriver;
838
839
    /// Returns an Any variation of whatever Self is.
840
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static);
841
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static>;
842
843
    // Register health checks used to monitor the store.
844
0
    fn register_health(self: Arc<Self>, _registry: &mut HealthRegistryBuilder) {}
845
846
    fn register_remove_callback(
847
        self: Arc<Self>,
848
        callback: Arc<dyn RemoveItemCallback>,
849
    ) -> Result<(), Error>;
850
}
851
852
// Callback to be called when a store deletes an item. This is used so
853
// compound stores can remove items from their internal state when their
854
// underlying stores remove items e.g. caches
855
pub trait RemoveItemCallback: Debug + Send + Sync {
856
    fn callback<'a>(
857
        &'a self,
858
        store_key: StoreKey<'a>,
859
    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
860
}
861
862
/// The instructions on how to decode a value from a Bytes & version into
863
/// the underlying type.
864
pub trait SchedulerStoreDecodeTo {
865
    type DecodeOutput;
866
    fn decode(version: i64, data: Bytes) -> Result<Self::DecodeOutput, Error>;
867
}
868
869
pub trait SchedulerSubscription: Send + Sync {
870
    fn changed(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
871
}
872
873
pub trait SchedulerSubscriptionManager: Send + Sync {
874
    type Subscription: SchedulerSubscription;
875
876
    fn notify_for_test(&self, value: String);
877
878
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
879
    where
880
        K: SchedulerStoreKeyProvider;
881
882
    fn is_reliable() -> bool;
883
}
884
885
/// The API surface for a scheduler store.
886
pub trait SchedulerStore: Send + Sync + 'static {
887
    type SubscriptionManager: SchedulerSubscriptionManager;
888
889
    /// Returns the subscription manager for the scheduler store.
890
    fn subscription_manager(&self) -> Result<Arc<Self::SubscriptionManager>, Error>;
891
892
    /// Updates or inserts an entry into the underlying store.
893
    /// Metadata about the key is attached to the compile-time type.
894
    /// If `StoreKeyProvider::Versioned` is `TrueValue`, the data will not
895
    /// be updated if the current version in the database does not match
896
    /// the version in the passed in data.
897
    /// No guarantees are made about when `Version` is `FalseValue`.
898
    /// Indexes are guaranteed to be updated atomically with the data.
899
    fn update_data<T>(&self, data: T) -> impl Future<Output = Result<Option<i64>, Error>> + Send
900
    where
901
        T: SchedulerStoreDataProvider
902
            + SchedulerStoreKeyProvider
903
            + SchedulerCurrentVersionProvider
904
            + Send;
905
906
    /// Searches for all keys in the store that match the given index prefix.
907
    fn search_by_index_prefix<K>(
908
        &self,
909
        index: K,
910
    ) -> impl Future<
911
        Output = Result<
912
            impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
913
            Error,
914
        >,
915
    > + Send
916
    where
917
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send;
918
919
    /// Returns data for the provided key with the given version if
920
    /// `StoreKeyProvider::Versioned` is `TrueValue`.
921
    fn get_and_decode<K>(
922
        &self,
923
        key: K,
924
    ) -> impl Future<Output = Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>> + Send
925
    where
926
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send;
927
}
928
929
/// A type that is used to let the scheduler store know what
930
/// index is being requested.
931
pub trait SchedulerIndexProvider {
932
    /// Only keys inserted with this prefix will be indexed.
933
    const KEY_PREFIX: &'static str;
934
935
    /// The name of the index.
936
    const INDEX_NAME: &'static str;
937
938
    /// The sort key for the index (if any).
939
    const MAYBE_SORT_KEY: Option<&'static str> = None;
940
941
    /// If the data is versioned.
942
    type Versioned: BoolValue;
943
944
    /// The value of the index.
945
    fn index_value(&self) -> Cow<'_, str>;
946
}
947
948
/// Provides a key to lookup data in the store.
949
pub trait SchedulerStoreKeyProvider {
950
    /// If the data is versioned.
951
    type Versioned: BoolValue;
952
953
    /// Returns the key for the data.
954
    fn get_key(&self) -> StoreKey<'static>;
955
}
956
957
/// Provides data to be stored in the scheduler store.
958
pub trait SchedulerStoreDataProvider {
959
    /// Converts the data into bytes to be stored in the store.
960
    fn try_into_bytes(self) -> Result<Bytes, Error>;
961
962
    /// Returns the indexes for the data if any.
963
3
    fn get_indexes(&self) -> Result<Vec<(&'static str, Bytes)>, Error> {
964
3
        Ok(Vec::new())
965
3
    }
966
}
967
968
/// Provides the current version of the data in the store.
969
pub trait SchedulerCurrentVersionProvider {
970
    /// Returns the current version of the data in the store.
971
    fn current_version(&self) -> i64;
972
}
973
974
/// Default implementation for when we are not providing a version
975
/// for the data.
976
impl<T> SchedulerCurrentVersionProvider for T
977
where
978
    T: SchedulerStoreKeyProvider<Versioned = FalseValue>,
979
{
980
0
    fn current_version(&self) -> i64 {
981
0
        0
982
0
    }
983
}
984
985
/// Compile time types for booleans.
986
pub trait BoolValue {
987
    const VALUE: bool;
988
}
989
/// Compile time check if something is false.
990
pub trait IsFalse {}
991
/// Compile time check if something is true.
992
pub trait IsTrue {}
993
994
/// Compile time true value.
995
#[derive(Debug, Clone, Copy)]
996
pub struct TrueValue;
997
impl BoolValue for TrueValue {
998
    const VALUE: bool = true;
999
}
1000
impl IsTrue for TrueValue {}
1001
1002
/// Compile time false value.
1003
#[derive(Debug, Clone, Copy)]
1004
pub struct FalseValue;
1005
impl BoolValue for FalseValue {
1006
    const VALUE: bool = false;
1007
}
1008
impl IsFalse for FalseValue {}