Coverage Report

Created: 2025-07-30 16:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/store_trait.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::borrow::{Borrow, BorrowMut};
16
use core::convert::Into;
17
use core::fmt::{self, Display};
18
use core::hash::{Hash, Hasher};
19
use core::ops::{Bound, RangeBounds};
20
use core::pin::Pin;
21
use core::ptr::addr_eq;
22
use std::borrow::Cow;
23
use std::collections::hash_map::DefaultHasher as StdHasher;
24
use std::ffi::OsString;
25
use std::sync::{Arc, OnceLock};
26
27
use async_trait::async_trait;
28
use bytes::{Bytes, BytesMut};
29
use futures::{Future, FutureExt, Stream, join, try_join};
30
use nativelink_error::{Code, Error, ResultExt, error_if, make_err};
31
use nativelink_metric::MetricsComponent;
32
use rand::rngs::StdRng;
33
use rand::{RngCore, SeedableRng};
34
use serde::{Deserialize, Serialize};
35
use tokio::io::{AsyncReadExt, AsyncSeekExt};
36
37
use crate::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair};
38
use crate::common::DigestInfo;
39
use crate::digest_hasher::{DigestHasher, DigestHasherFunc, default_digest_hasher_func};
40
use crate::fs;
41
use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
42
43
static DEFAULT_DIGEST_SIZE_HEALTH_CHECK: OnceLock<usize> = OnceLock::new();
44
/// Default digest size for health check data. Any change in this value
45
/// changes the default contract. `GlobalConfig` should be updated to reflect
46
/// changes in this value.
47
pub const DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG: usize = 1024 * 1024;
48
49
// Get the default digest size for health check data, if value is unset a system wide default is used.
50
0
pub fn default_digest_size_health_check() -> usize {
51
0
    *DEFAULT_DIGEST_SIZE_HEALTH_CHECK.get_or_init(|| DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG)
52
0
}
53
54
/// Set the default digest size for health check data, this should be called once.
55
0
pub fn set_default_digest_size_health_check(size: usize) -> Result<(), Error> {
56
0
    DEFAULT_DIGEST_SIZE_HEALTH_CHECK.set(size).map_err(|_| {
57
0
        make_err!(
58
0
            Code::Internal,
59
            "set_default_digest_size_health_check already set"
60
        )
61
0
    })
62
0
}
63
64
#[derive(Debug, PartialEq, Eq, Copy, Clone, Serialize, Deserialize)]
65
pub enum UploadSizeInfo {
66
    /// When the data transfer amount is known to be exact size, this enum should be used.
67
    /// The receiver store can use this to better optimize the way the data is sent or stored.
68
    ExactSize(u64),
69
70
    /// When the data transfer amount is not known to be exact, the caller should use this enum
71
    /// to provide the maximum size that could possibly be sent. This will bypass the exact size
72
    /// checks, but still provide useful information to the underlying store about the data being
73
    /// sent that it can then use to optimize the upload process.
74
    MaxSize(u64),
75
}
76
77
/// Utility to send all the data to the store from a file.
78
// Note: This is not inlined because some code may want to bypass any underlying
79
// optimizations that may be present in the inner store.
80
8
pub async fn slow_update_store_with_file<S: StoreDriver + ?Sized>(
81
8
    store: Pin<&S>,
82
8
    digest: impl Into<StoreKey<'_>>,
83
8
    file: &mut fs::FileSlot,
84
8
    upload_size: UploadSizeInfo,
85
8
) -> Result<(), Error> {
86
8
    file.rewind()
87
8
        .await
88
8
        .err_tip(|| "Failed to rewind in upload_file_to_store")
?0
;
89
8
    let (mut tx, rx) = make_buf_channel_pair();
90
91
8
    let update_fut = store
92
8
        .update(digest.into(), rx, upload_size)
93
8
        .map(|r| r.err_tip(|| "Could not upload data to store in upload_file_to_store"));
94
8
    let read_data_fut = async move {
95
        loop {
96
77
            let mut buf = BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE);
97
77
            let read = file
98
77
                .read_buf(&mut buf)
99
77
                .await
100
77
                .err_tip(|| "Failed to read in upload_file_to_store")
?0
;
101
77
            if read == 0 {
  Branch (101:16): [True: 7, False: 5]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [Folded - Ignored]
  Branch (101:16): [True: 1, False: 64]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [Folded - Ignored]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
  Branch (101:16): [True: 0, False: 0]
102
8
                break;
103
69
            }
104
69
            tx.send(buf.freeze())
105
69
                .await
106
69
                .err_tip(|| "Failed to send in upload_file_to_store")
?0
;
107
        }
108
8
        tx.send_eof()
109
8
            .err_tip(|| "Could not send EOF to store in upload_file_to_store")
110
8
    };
111
8
    tokio::pin!(read_data_fut);
112
8
    let (update_res, read_res) = tokio::join!(update_fut, read_data_fut);
113
8
    update_res.merge(read_res)
114
8
}
115
116
/// Optimizations that stores may want to expose to the callers.
117
/// This is useful for specific cases when the store can optimize the processing
118
/// of the data being processed.
119
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
120
pub enum StoreOptimizations {
121
    /// The store can optimize the upload process when it knows the data is coming from a file.
122
    FileUpdates,
123
124
    /// If the store will ignore the data uploads.
125
    NoopUpdates,
126
127
    /// If the store will never serve downloads.
128
    NoopDownloads,
129
}
130
131
/// A wrapper struct for [`StoreKey`] to work around
132
/// lifetime limitations in `HashMap::get()` as described in
133
/// <https://github.com/rust-lang/rust/issues/80389>
134
///
135
/// As such this is a wrapper type that is stored in the
136
/// maps using the workaround as described in
137
/// <https://blinsay.com/blog/compound-keys/>
138
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
139
#[repr(transparent)]
140
pub struct StoreKeyBorrow(StoreKey<'static>);
141
142
impl From<StoreKey<'static>> for StoreKeyBorrow {
143
5.87k
    fn from(key: StoreKey<'static>) -> Self {
144
5.87k
        Self(key)
145
5.87k
    }
146
}
147
148
impl From<StoreKeyBorrow> for StoreKey<'static> {
149
0
    fn from(key_borrow: StoreKeyBorrow) -> Self {
150
0
        key_borrow.0
151
0
    }
152
}
153
154
impl<'a> Borrow<StoreKey<'a>> for StoreKeyBorrow {
155
4.66k
    fn borrow(&self) -> &StoreKey<'a> {
156
4.66k
        &self.0
157
4.66k
    }
158
}
159
160
impl<'a> Borrow<StoreKey<'a>> for &StoreKeyBorrow {
161
0
    fn borrow(&self) -> &StoreKey<'a> {
162
0
        &self.0
163
0
    }
164
}
165
166
/// Holds something that can be converted into a key the
167
/// store API can understand. Generally this is a digest
168
/// but it can also be a string if the caller wishes to
169
/// store the data directly and reference it by a string
170
/// directly.
171
#[derive(Debug, Eq)]
172
pub enum StoreKey<'a> {
173
    /// A string key.
174
    Str(Cow<'a, str>),
175
176
    /// A key that is a digest.
177
    Digest(DigestInfo),
178
}
179
180
impl<'a> StoreKey<'a> {
181
    /// Creates a new store key from a string.
182
22
    pub const fn new_str(s: &'a str) -> Self {
183
22
        StoreKey::Str(Cow::Borrowed(s))
184
22
    }
185
186
    /// Returns a shallow clone of the key.
187
    /// This is extremely cheap and should be used when clone
188
    /// is needed but the key is not going to be modified.
189
    #[must_use]
190
    #[allow(
191
        clippy::missing_const_for_fn,
192
        reason = "False positive on stable, but not on nightly"
193
    )]
194
6.88k
    pub fn borrow(&'a self) -> Self {
195
60
        match self {
196
22
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Borrowed(s)),
197
38
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Borrowed(s)),
198
6.82k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
199
        }
200
6.88k
    }
201
202
    /// Converts the key into an owned version. This is useful
203
    /// when the caller needs an owned version of the key.
204
7.29k
    pub fn into_owned(self) -> StoreKey<'static> {
205
60
        match self {
206
7
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Owned(s)),
207
53
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Owned(s.to_owned())),
208
7.23k
            StoreKey::Digest(d) => StoreKey::Digest(d),
209
        }
210
7.29k
    }
211
212
    /// Converts the key into a digest. This is useful when the caller
213
    /// must have a digest key. If the data is not a digest, it may
214
    /// hash the underlying key and return a digest of the hash of the key
215
153
    pub fn into_digest(self) -> DigestInfo {
216
153
        match self {
217
151
            StoreKey::Digest(digest) => digest,
218
2
            StoreKey::Str(s) => {
219
2
                let mut hasher = DigestHasherFunc::Blake3.hasher();
220
2
                hasher.update(s.as_bytes());
221
2
                hasher.finalize_digest()
222
            }
223
        }
224
153
    }
225
226
    /// Returns the key as a string. If the key is a digest, it will
227
    /// return a string representation of the digest. If the key is a string,
228
    /// it will return the string itself.
229
75
    pub fn as_str(&'a self) -> Cow<'a, str> {
230
26
        match self {
231
26
            StoreKey::Str(Cow::Owned(s)) => Cow::Borrowed(s),
232
0
            StoreKey::Str(Cow::Borrowed(s)) => Cow::Borrowed(s),
233
49
            StoreKey::Digest(d) => Cow::Owned(format!("{d}")),
234
        }
235
75
    }
236
}
237
238
impl Clone for StoreKey<'static> {
239
5.88k
    fn clone(&self) -> Self {
240
5.88k
        match self {
241
20
            StoreKey::Str(s) => StoreKey::Str(s.clone()),
242
5.86k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
243
        }
244
5.88k
    }
245
}
246
247
impl PartialOrd for StoreKey<'_> {
248
36
    fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
249
36
        Some(self.cmp(other))
250
36
    }
251
}
252
253
impl Ord for StoreKey<'_> {
254
53
    fn cmp(&self, other: &Self) -> core::cmp::Ordering {
255
53
        match (self, other) {
256
53
            (StoreKey::Str(a), StoreKey::Str(b)) => a.cmp(b),
257
0
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a.cmp(b),
258
0
            (StoreKey::Str(_), StoreKey::Digest(_)) => core::cmp::Ordering::Less,
259
0
            (StoreKey::Digest(_), StoreKey::Str(_)) => core::cmp::Ordering::Greater,
260
        }
261
53
    }
262
}
263
264
impl PartialEq for StoreKey<'_> {
265
5.61k
    fn eq(&self, other: &Self) -> bool {
266
5.61k
        match (self, other) {
267
50
            (StoreKey::Str(a), StoreKey::Str(b)) => a == b,
268
5.56k
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a == b,
269
0
            _ => false,
270
        }
271
5.61k
    }
272
}
273
274
impl Hash for StoreKey<'_> {
275
25.2k
    fn hash<H: Hasher>(&self, state: &mut H) {
276
        /// Salts the hash with the enum value that represents
277
        /// the type of the key.
278
        #[repr(u8)]
279
        enum HashId {
280
            Str = 0,
281
            Digest = 1,
282
        }
283
25.2k
        match self {
284
42
            StoreKey::Str(s) => {
285
42
                (HashId::Str as u8).hash(state);
286
42
                s.hash(state);
287
42
            }
288
25.2k
            StoreKey::Digest(d) => {
289
25.2k
                (HashId::Digest as u8).hash(state);
290
25.2k
                d.hash(state);
291
25.2k
            }
292
        }
293
25.2k
    }
294
}
295
296
impl<'a> From<&'a str> for StoreKey<'a> {
297
1
    fn from(s: &'a str) -> Self {
298
1
        StoreKey::Str(Cow::Borrowed(s))
299
1
    }
300
}
301
302
impl From<String> for StoreKey<'static> {
303
0
    fn from(s: String) -> Self {
304
0
        StoreKey::Str(Cow::Owned(s))
305
0
    }
306
}
307
308
impl From<DigestInfo> for StoreKey<'_> {
309
10.4k
    fn from(d: DigestInfo) -> Self {
310
10.4k
        StoreKey::Digest(d)
311
10.4k
    }
312
}
313
314
impl From<&DigestInfo> for StoreKey<'_> {
315
39
    fn from(d: &DigestInfo) -> Self {
316
39
        StoreKey::Digest(*d)
317
39
    }
318
}
319
320
// mostly for use with tracing::Value
321
impl Display for StoreKey<'_> {
322
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
323
0
        match self {
324
0
            StoreKey::Str(s) => {
325
0
                write!(f, "{s}")
326
            }
327
0
            StoreKey::Digest(d) => {
328
0
                write!(f, "Digest: {d}")
329
            }
330
        }
331
0
    }
332
}
333
334
#[derive(Clone, MetricsComponent)]
335
#[repr(transparent)]
336
pub struct Store {
337
    #[metric]
338
    inner: Arc<dyn StoreDriver>,
339
}
340
341
impl fmt::Debug for Store {
342
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343
0
        f.debug_struct("Store").finish_non_exhaustive()
344
0
    }
345
}
346
347
impl Store {
348
279
    pub fn new(inner: Arc<dyn StoreDriver>) -> Self {
349
279
        Self { inner }
350
279
    }
351
}
352
353
impl Store {
354
    /// Returns the immediate inner store driver.
355
    /// Note: This does not recursively try to resolve underlying store drivers
356
    /// like `.inner_store()` does.
357
    #[inline]
358
1
    pub fn into_inner(self) -> Arc<dyn StoreDriver> {
359
1
        self.inner
360
1
    }
361
362
    /// Gets the underlying store for the given digest.
363
    /// A caller might want to use this to obtain a reference to the "real" underlying store
364
    /// (if applicable) and check if it implements some special traits that allow optimizations.
365
    /// Note: If the store performs complex operations on the data, it should return itself.
366
    #[inline]
367
176
    pub fn inner_store<'a, K: Into<StoreKey<'a>>>(&self, digest: Option<K>) -> &dyn StoreDriver {
368
176
        self.inner.inner_store(digest.map(Into::into))
369
176
    }
370
371
    /// Tries to cast the underlying store to the given type.
372
    #[inline]
373
67
    pub fn downcast_ref<U: StoreDriver>(&self, maybe_digest: Option<StoreKey<'_>>) -> Option<&U> {
374
67
        self.inner.inner_store(maybe_digest).as_any().downcast_ref()
375
67
    }
376
377
    /// Register health checks used to monitor the store.
378
    #[inline]
379
0
    pub fn register_health(&self, registry: &mut HealthRegistryBuilder) {
380
0
        self.inner.clone().register_health(registry);
381
0
    }
382
}
383
384
impl StoreLike for Store {
385
    #[inline]
386
10.0k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
387
10.0k
        self.inner.as_ref()
388
10.0k
    }
389
390
7
    fn as_pin(&self) -> Pin<&Self> {
391
7
        Pin::new(self)
392
7
    }
393
}
394
395
impl<T> StoreLike for T
396
where
397
    T: StoreDriver + Sized,
398
{
399
    #[inline]
400
8.00k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
401
8.00k
        self
402
8.00k
    }
403
404
90
    fn as_pin(&self) -> Pin<&Self> {
405
90
        Pin::new(self)
406
90
    }
407
}
408
409
pub trait StoreLike: Send + Sync + Sized + Unpin + 'static {
410
    /// Returns the immediate inner store driver.
411
    fn as_store_driver(&self) -> &'_ dyn StoreDriver;
412
413
    /// Utility function to return a pinned reference to self.
414
    fn as_pin(&self) -> Pin<&Self>;
415
416
    /// Utility function to return a pinned reference to the store driver.
417
    #[inline]
418
18.0k
    fn as_store_driver_pin(&self) -> Pin<&'_ dyn StoreDriver> {
419
18.0k
        Pin::new(self.as_store_driver())
420
18.0k
    }
421
422
    /// Look up a digest in the store and return None if it does not exist in
423
    /// the store, or Some(size) if it does.
424
    /// Note: On an AC store the size will be incorrect and should not be used!
425
    #[inline]
426
233
    fn has<'a>(
427
233
        &'a self,
428
233
        digest: impl Into<StoreKey<'a>>,
429
233
    ) -> impl Future<Output = Result<Option<u64>, Error>> + 'a {
430
233
        self.as_store_driver_pin().has(digest.into())
431
233
    }
432
433
    /// Look up a list of digests in the store and return a result for each in
434
    /// the same order as input.  The result will either be None if it does not
435
    /// exist in the store, or Some(size) if it does.
436
    /// Note: On an AC store the size will be incorrect and should not be used!
437
    #[inline]
438
19
    fn has_many<'a>(
439
19
        &'a self,
440
19
        digests: &'a [StoreKey<'a>],
441
19
    ) -> impl Future<Output = Result<Vec<Option<u64>>, Error>> + Send + 'a {
442
19
        self.as_store_driver_pin().has_many(digests)
443
19
    }
444
445
    /// The implementation of the above has and `has_many` functions.  See their
446
    /// documentation for details.
447
    #[inline]
448
42
    fn has_with_results<'a>(
449
42
        &'a self,
450
42
        digests: &'a [StoreKey<'a>],
451
42
        results: &'a mut [Option<u64>],
452
42
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
453
42
        self.as_store_driver_pin()
454
42
            .has_with_results(digests, results)
455
42
    }
456
457
    /// List all the keys in the store that are within the given range.
458
    /// `handler` is called for each key in the range. If `handler` returns
459
    /// false, the listing is stopped.
460
    ///
461
    /// The number of keys passed through the handler is the return value.
462
    #[inline]
463
15
    fn list<'a, 'b>(
464
15
        &'a self,
465
15
        range: impl RangeBounds<StoreKey<'b>> + Send + 'b,
466
15
        mut handler: impl for<'c> FnMut(&'c StoreKey) -> bool + Send + Sync + 'a,
467
15
    ) -> impl Future<Output = Result<u64, Error>> + Send + 'a
468
15
    where
469
15
        'b: 'a,
470
    {
471
        // Note: We use a manual async move, so the future can own the `range` and `handler`,
472
        // otherwise we'd require the caller to pass them in by reference making more borrow
473
        // checker noise.
474
15
        async move {
475
15
            self.as_store_driver_pin()
476
15
                .list(
477
15
                    (
478
15
                        range.start_bound().map(StoreKey::borrow),
479
15
                        range.end_bound().map(StoreKey::borrow),
480
15
                    ),
481
15
                    &mut handler,
482
15
                )
483
15
                .await
484
15
        }
485
15
    }
486
487
    /// Sends the data to the store.
488
    #[inline]
489
5.23k
    fn update<'a>(
490
5.23k
        &'a self,
491
5.23k
        digest: impl Into<StoreKey<'a>>,
492
5.23k
        reader: DropCloserReadHalf,
493
5.23k
        upload_size: UploadSizeInfo,
494
5.23k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
495
5.23k
        self.as_store_driver_pin()
496
5.23k
            .update(digest.into(), reader, upload_size)
497
5.23k
    }
498
499
    /// Any optimizations the store might want to expose to the callers.
500
    /// By default, no optimizations are exposed.
501
    #[inline]
502
14
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
503
14
        self.as_store_driver_pin().optimized_for(optimization)
504
14
    }
505
506
    /// Specialized version of `.update()` which takes a `FileSlot`.
507
    /// This is useful if the underlying store can optimize the upload process
508
    /// when it knows the data is coming from a file.
509
    #[inline]
510
10
    fn update_with_whole_file<'a>(
511
10
        &'a self,
512
10
        digest: impl Into<StoreKey<'a>>,
513
10
        path: OsString,
514
10
        file: fs::FileSlot,
515
10
        upload_size: UploadSizeInfo,
516
10
    ) -> impl Future<Output = Result<Option<fs::FileSlot>, Error>> + Send + 'a {
517
10
        self.as_store_driver_pin()
518
10
            .update_with_whole_file(digest.into(), path, file, upload_size)
519
10
    }
520
521
    /// Utility to send all the data to the store when you have all the bytes.
522
    #[inline]
523
5.66k
    fn update_oneshot<'a>(
524
5.66k
        &'a self,
525
5.66k
        digest: impl Into<StoreKey<'a>>,
526
5.66k
        data: Bytes,
527
5.66k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
528
5.66k
        self.as_store_driver_pin()
529
5.66k
            .update_oneshot(digest.into(), data)
530
5.66k
    }
531
532
    /// Retrieves part of the data from the store and writes it to the given writer.
533
    #[inline]
534
1.31k
    fn get_part<'a>(
535
1.31k
        &'a self,
536
1.31k
        digest: impl Into<StoreKey<'a>>,
537
1.31k
        mut writer: impl BorrowMut<DropCloserWriteHalf> + Send + 'a,
538
1.31k
        offset: u64,
539
1.31k
        length: Option<u64>,
540
1.31k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
541
1.31k
        let key = digest.into();
542
        // Note: We need to capture `writer` just in case the caller
543
        // expects the drop() method to be called on it when the future
544
        // is done due to the complex interaction between the DropCloserWriteHalf
545
        // and the DropCloserReadHalf during drop().
546
1.31k
        async move {
547
1.31k
            self.as_store_driver_pin()
548
1.31k
                .get_part(key, writer.borrow_mut(), offset, length)
549
1.31k
                .await
550
1.31k
        }
551
1.31k
    }
552
553
    /// Utility that works the same as `.get_part()`, but writes all the data.
554
    #[inline]
555
15
    fn get<'a>(
556
15
        &'a self,
557
15
        key: impl Into<StoreKey<'a>>,
558
15
        writer: DropCloserWriteHalf,
559
15
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
560
15
        self.as_store_driver_pin().get(key.into(), writer)
561
15
    }
562
563
    /// Utility that will return all the bytes at once instead of in a streaming manner.
564
    #[inline]
565
5.25k
    fn get_part_unchunked<'a>(
566
5.25k
        &'a self,
567
5.25k
        key: impl Into<StoreKey<'a>>,
568
5.25k
        offset: u64,
569
5.25k
        length: Option<u64>,
570
5.25k
    ) -> impl Future<Output = Result<Bytes, Error>> + Send + 'a {
571
5.25k
        self.as_store_driver_pin()
572
5.25k
            .get_part_unchunked(key.into(), offset, length)
573
5.25k
    }
574
575
    /// Default implementation of the health check. Some stores may want to override this
576
    /// in situations where the default implementation is not sufficient.
577
    #[inline]
578
0
    fn check_health(
579
0
        &self,
580
0
        namespace: Cow<'static, str>,
581
0
    ) -> impl Future<Output = HealthStatus> + Send {
582
0
        self.as_store_driver_pin().check_health(namespace)
583
0
    }
584
}
585
586
#[async_trait]
587
pub trait StoreDriver:
588
    Sync + Send + Unpin + MetricsComponent + HealthStatusIndicator + 'static
589
{
590
    /// See: [`StoreLike::has`] for details.
591
    #[inline]
592
498
    async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<u64>, Error> {
593
249
        let mut result = [None];
594
249
        self.has_with_results(&[key], &mut result).await
?2
;
595
247
        Ok(result[0])
596
498
    }
597
598
    /// See: [`StoreLike::has_many`] for details.
599
    #[inline]
600
    async fn has_many(
601
        self: Pin<&Self>,
602
        digests: &[StoreKey<'_>],
603
38
    ) -> Result<Vec<Option<u64>>, Error> {
604
19
        let mut results = vec![None; digests.len()];
605
19
        self.has_with_results(digests, &mut results).await
?0
;
606
19
        Ok(results)
607
38
    }
608
609
    /// See: [`StoreLike::has_with_results`] for details.
610
    async fn has_with_results(
611
        self: Pin<&Self>,
612
        digests: &[StoreKey<'_>],
613
        results: &mut [Option<u64>],
614
    ) -> Result<(), Error>;
615
616
    /// See: [`StoreLike::list`] for details.
617
    async fn list(
618
        self: Pin<&Self>,
619
        _range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
620
        _handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
621
0
    ) -> Result<u64, Error> {
622
        // TODO(aaronmondal) We should force all stores to implement this function instead of
623
        // providing a default implementation.
624
0
        Err(make_err!(
625
0
            Code::Unimplemented,
626
0
            "Store::list() not implemented for this store"
627
0
        ))
628
0
    }
629
630
    /// See: [`StoreLike::update`] for details.
631
    async fn update(
632
        self: Pin<&Self>,
633
        key: StoreKey<'_>,
634
        reader: DropCloserReadHalf,
635
        upload_size: UploadSizeInfo,
636
    ) -> Result<(), Error>;
637
638
    /// See: [`StoreLike::optimized_for`] for details.
639
96
    fn optimized_for(&self, _optimization: StoreOptimizations) -> bool {
640
96
        false
641
96
    }
642
643
    /// See: [`StoreLike::update_with_whole_file`] for details.
644
    async fn update_with_whole_file(
645
        self: Pin<&Self>,
646
        key: StoreKey<'_>,
647
        path: OsString,
648
        mut file: fs::FileSlot,
649
        upload_size: UploadSizeInfo,
650
2
    ) -> Result<Option<fs::FileSlot>, Error> {
651
1
        let inner_store = self.inner_store(Some(key.borrow()));
652
1
        if inner_store.optimized_for(StoreOptimizations::FileUpdates) {
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [Folded - Ignored]
  Branch (652:12): [True: 0, False: 1]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [Folded - Ignored]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
  Branch (652:12): [True: 0, False: 0]
653
0
            error_if!(
654
0
                addr_eq(inner_store, &raw const *self),
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [Folded - Ignored]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [Folded - Ignored]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
  Branch (654:17): [True: 0, False: 0]
655
                "Store::inner_store() returned self when optimization present"
656
            );
657
0
            return Pin::new(inner_store)
658
0
                .update_with_whole_file(key, path, file, upload_size)
659
0
                .await;
660
1
        }
661
1
        slow_update_store_with_file(self, key, &mut file, upload_size).await
?0
;
662
1
        Ok(Some(file))
663
2
    }
664
665
    /// See: [`StoreLike::update_oneshot`] for details.
666
11.5k
    async fn update_oneshot(self: Pin<&Self>, key: StoreKey<'_>, data: Bytes) -> Result<(), Error> {
667
        // TODO(aaronmondal) This is extremely inefficient, since we have exactly
668
        // what we need here. Maybe we could instead make a version of the stream
669
        // that can take objects already fully in memory instead?
670
5.76k
        let (mut tx, rx) = make_buf_channel_pair();
671
672
5.76k
        let data_len =
673
5.76k
            u64::try_from(data.len()).err_tip(|| "Could not convert data.len() to u64")
?0
;
674
5.76k
        let send_fut = async move {
675
            // Only send if we are not EOF.
676
5.76k
            if !data.is_empty() {
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 34, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [Folded - Ignored]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 1, False: 0]
  Branch (676:16): [True: 16, False: 40]
  Branch (676:16): [True: 8, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 4, False: 1]
  Branch (676:16): [True: 7, False: 0]
  Branch (676:16): [True: 90, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 2, False: 0]
  Branch (676:16): [True: 2, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 4, False: 0]
  Branch (676:16): [True: 1, False: 0]
  Branch (676:16): [True: 2, False: 0]
  Branch (676:16): [True: 2, False: 0]
  Branch (676:16): [True: 2, False: 0]
  Branch (676:16): [True: 2, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 11, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 406, False: 1]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [Folded - Ignored]
  Branch (676:16): [True: 3, False: 2]
  Branch (676:16): [True: 2, False: 0]
  Branch (676:16): [True: 1, False: 0]
  Branch (676:16): [True: 13, False: 1]
  Branch (676:16): [True: 51, False: 32]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 1, False: 0]
  Branch (676:16): [True: 5.00k, False: 0]
  Branch (676:16): [True: 6, False: 0]
  Branch (676:16): [True: 4, False: 0]
  Branch (676:16): [True: 2, False: 0]
  Branch (676:16): [True: 0, False: 0]
  Branch (676:16): [True: 7, False: 0]
677
5.68k
                tx.send(data)
678
5.68k
                    .await
679
5.68k
                    .err_tip(|| "Failed to write data in update_oneshot")
?0
;
680
77
            }
681
5.76k
            tx.send_eof()
682
5.76k
                .err_tip(|| "Failed to write EOF in update_oneshot")
?0
;
683
5.76k
            Ok(())
684
5.76k
        };
685
5.76k
        try_join!(
686
5.76k
            send_fut,
687
5.76k
            self.update(key, rx, UploadSizeInfo::ExactSize(data_len))
688
2
        )?;
689
5.76k
        Ok(())
690
11.5k
    }
691
692
    /// See: [`StoreLike::get_part`] for details.
693
    async fn get_part(
694
        self: Pin<&Self>,
695
        key: StoreKey<'_>,
696
        writer: &mut DropCloserWriteHalf,
697
        offset: u64,
698
        length: Option<u64>,
699
    ) -> Result<(), Error>;
700
701
    /// See: [`StoreLike::get`] for details.
702
    #[inline]
703
    async fn get(
704
        self: Pin<&Self>,
705
        key: StoreKey<'_>,
706
        mut writer: DropCloserWriteHalf,
707
38
    ) -> Result<(), Error> {
708
19
        self.get_part(key, &mut writer, 0, None).await
709
37
    }
710
711
    /// See: [`StoreLike::get_part_unchunked`] for details.
712
    async fn get_part_unchunked(
713
        self: Pin<&Self>,
714
        key: StoreKey<'_>,
715
        offset: u64,
716
        length: Option<u64>,
717
10.7k
    ) -> Result<Bytes, Error> {
718
5.36k
        let length_usize = length
719
5.36k
            .map(|v| 
usize::try_from2.24k
(
v2.24k
).
err_tip2.24k
(|| "Could not convert length to usize"))
720
5.36k
            .transpose()
?0
;
721
722
        // TODO(aaronmondal) This is extremely inefficient, since we have exactly
723
        // what we need here. Maybe we could instead make a version of the stream
724
        // that can take objects already fully in memory instead?
725
5.36k
        let (mut tx, mut rx) = make_buf_channel_pair();
726
727
5.36k
        let (data_res, get_part_res) = join!(
728
5.36k
            rx.consume(length_usize),
729
            // We use a closure here to ensure that the `tx` is dropped when the
730
            // future is done.
731
5.36k
            async move { self.get_part(key, &mut tx, offset, length).await },
732
        );
733
5.36k
        get_part_res
734
5.36k
            .err_tip(|| "Failed to get_part in get_part_unchunked")
735
5.36k
            .merge(data_res.err_tip(|| "Failed to read stream to completion in get_part_unchunked"))
736
10.7k
    }
737
738
    /// See: [`StoreLike::check_health`] for details.
739
0
    async fn check_health(self: Pin<&Self>, namespace: Cow<'static, str>) -> HealthStatus {
740
0
        let digest_data_size = default_digest_size_health_check();
741
0
        let mut digest_data = vec![0u8; digest_data_size];
742
743
0
        let mut namespace_hasher = StdHasher::new();
744
0
        namespace.hash(&mut namespace_hasher);
745
0
        self.get_name().hash(&mut namespace_hasher);
746
0
        let hash_seed = namespace_hasher.finish();
747
748
        // Fill the digest data with random data based on a stable
749
        // hash of the namespace and store name. Intention is to
750
        // have randomly filled data that is unique per store and
751
        // does not change between health checks. This is to ensure
752
        // we are not adding more data to store on each health check.
753
0
        let mut rng: StdRng = StdRng::seed_from_u64(hash_seed);
754
0
        rng.fill_bytes(&mut digest_data);
755
756
0
        let mut digest_hasher = default_digest_hasher_func().hasher();
757
0
        digest_hasher.update(&digest_data);
758
0
        let digest_data_len = digest_data.len() as u64;
759
0
        let digest_info = StoreKey::from(digest_hasher.finalize_digest());
760
761
0
        let digest_bytes = Bytes::copy_from_slice(&digest_data);
762
763
0
        if let Err(e) = self
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [Folded - Ignored]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [Folded - Ignored]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
  Branch (763:16): [True: 0, False: 0]
764
0
            .update_oneshot(digest_info.borrow(), digest_bytes.clone())
765
0
            .await
766
        {
767
0
            return HealthStatus::new_failed(
768
0
                self.get_ref(),
769
0
                format!("Store.update_oneshot() failed: {e}").into(),
770
            );
771
0
        }
772
773
0
        match self.has(digest_info.borrow()).await {
774
0
            Ok(Some(s)) => {
775
0
                if s != digest_data_len {
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [Folded - Ignored]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [Folded - Ignored]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
776
0
                    return HealthStatus::new_failed(
777
0
                        self.get_ref(),
778
0
                        format!("Store.has() size mismatch {s} != {digest_data_len}").into(),
779
                    );
780
0
                }
781
            }
782
            Ok(None) => {
783
0
                return HealthStatus::new_failed(
784
0
                    self.get_ref(),
785
0
                    "Store.has() size not found".into(),
786
                );
787
            }
788
0
            Err(e) => {
789
0
                return HealthStatus::new_failed(
790
0
                    self.get_ref(),
791
0
                    format!("Store.has() failed: {e}").into(),
792
                );
793
            }
794
        }
795
796
0
        match self
797
0
            .get_part_unchunked(digest_info, 0, Some(digest_data_len))
798
0
            .await
799
        {
800
0
            Ok(b) => {
801
0
                if b != digest_bytes {
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [Folded - Ignored]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [Folded - Ignored]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
  Branch (801:20): [True: 0, False: 0]
802
0
                    return HealthStatus::new_failed(
803
0
                        self.get_ref(),
804
0
                        "Store.get_part_unchunked() data mismatch".into(),
805
                    );
806
0
                }
807
            }
808
0
            Err(e) => {
809
0
                return HealthStatus::new_failed(
810
0
                    self.get_ref(),
811
0
                    format!("Store.get_part_unchunked() failed: {e}").into(),
812
                );
813
            }
814
        }
815
816
0
        HealthStatus::new_ok(self.get_ref(), "Successfully store health check".into())
817
0
    }
818
819
    /// See: [`Store::inner_store`] for details.
820
    fn inner_store(&self, _digest: Option<StoreKey<'_>>) -> &dyn StoreDriver;
821
822
    /// Returns an Any variation of whatever Self is.
823
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static);
824
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static>;
825
826
    // Register health checks used to monitor the store.
827
0
    fn register_health(self: Arc<Self>, _registry: &mut HealthRegistryBuilder) {}
828
}
829
830
/// The instructions on how to decode a value from a Bytes & version into
831
/// the underlying type.
832
pub trait SchedulerStoreDecodeTo {
833
    type DecodeOutput;
834
    fn decode(version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error>;
835
}
836
837
pub trait SchedulerSubscription: Send + Sync {
838
    fn changed(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
839
}
840
841
pub trait SchedulerSubscriptionManager: Send + Sync {
842
    type Subscription: SchedulerSubscription;
843
844
    fn notify_for_test(&self, value: String);
845
846
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
847
    where
848
        K: SchedulerStoreKeyProvider;
849
}
850
851
/// The API surface for a scheduler store.
852
pub trait SchedulerStore: Send + Sync + 'static {
853
    type SubscriptionManager: SchedulerSubscriptionManager;
854
855
    /// Returns the subscription manager for the scheduler store.
856
    fn subscription_manager(&self) -> Result<Arc<Self::SubscriptionManager>, Error>;
857
858
    /// Updates or inserts an entry into the underlying store.
859
    /// Metadata about the key is attached to the compile-time type.
860
    /// If `StoreKeyProvider::Versioned` is `TrueValue`, the data will not
861
    /// be updated if the current version in the database does not match
862
    /// the version in the passed in data.
863
    /// No guarantees are made about when `Version` is `FalseValue`.
864
    /// Indexes are guaranteed to be updated atomically with the data.
865
    fn update_data<T>(&self, data: T) -> impl Future<Output = Result<Option<u64>, Error>> + Send
866
    where
867
        T: SchedulerStoreDataProvider
868
            + SchedulerStoreKeyProvider
869
            + SchedulerCurrentVersionProvider
870
            + Send;
871
872
    /// Searches for all keys in the store that match the given index prefix.
873
    fn search_by_index_prefix<K>(
874
        &self,
875
        index: K,
876
    ) -> impl Future<
877
        Output = Result<
878
            impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
879
            Error,
880
        >,
881
    > + Send
882
    where
883
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send;
884
885
    /// Returns data for the provided key with the given version if
886
    /// `StoreKeyProvider::Versioned` is `TrueValue`.
887
    fn get_and_decode<K>(
888
        &self,
889
        key: K,
890
    ) -> impl Future<Output = Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>> + Send
891
    where
892
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send;
893
}
894
895
/// A type that is used to let the scheduler store know what
896
/// index is being requested.
897
pub trait SchedulerIndexProvider {
898
    /// Only keys inserted with this prefix will be indexed.
899
    const KEY_PREFIX: &'static str;
900
901
    /// The name of the index.
902
    const INDEX_NAME: &'static str;
903
904
    /// The sort key for the index (if any).
905
    const MAYBE_SORT_KEY: Option<&'static str> = None;
906
907
    /// If the data is versioned.
908
    type Versioned: BoolValue;
909
910
    /// The value of the index.
911
    fn index_value(&self) -> Cow<'_, str>;
912
}
913
914
/// Provides a key to lookup data in the store.
915
pub trait SchedulerStoreKeyProvider {
916
    /// If the data is versioned.
917
    type Versioned: BoolValue;
918
919
    /// Returns the key for the data.
920
    fn get_key(&self) -> StoreKey<'static>;
921
}
922
923
/// Provides data to be stored in the scheduler store.
924
pub trait SchedulerStoreDataProvider {
925
    /// Converts the data into bytes to be stored in the store.
926
    fn try_into_bytes(self) -> Result<Bytes, Error>;
927
928
    /// Returns the indexes for the data if any.
929
3
    fn get_indexes(&self) -> Result<Vec<(&'static str, Bytes)>, Error> {
930
3
        Ok(Vec::new())
931
3
    }
932
}
933
934
/// Provides the current version of the data in the store.
935
pub trait SchedulerCurrentVersionProvider {
936
    /// Returns the current version of the data in the store.
937
    fn current_version(&self) -> u64;
938
}
939
940
/// Default implementation for when we are not providing a version
941
/// for the data.
942
impl<T> SchedulerCurrentVersionProvider for T
943
where
944
    T: SchedulerStoreKeyProvider<Versioned = FalseValue>,
945
{
946
0
    fn current_version(&self) -> u64 {
947
0
        0
948
0
    }
949
}
950
951
/// Compile time types for booleans.
952
pub trait BoolValue {
953
    const VALUE: bool;
954
}
955
/// Compile time check if something is false.
956
pub trait IsFalse {}
957
/// Compile time check if something is true.
958
pub trait IsTrue {}
959
960
/// Compile time true value.
961
#[derive(Debug, Clone, Copy)]
962
pub struct TrueValue;
963
impl BoolValue for TrueValue {
964
    const VALUE: bool = true;
965
}
966
impl IsTrue for TrueValue {}
967
968
/// Compile time false value.
969
#[derive(Debug, Clone, Copy)]
970
pub struct FalseValue;
971
impl BoolValue for FalseValue {
972
    const VALUE: bool = false;
973
}
974
impl IsFalse for FalseValue {}