Coverage Report

Created: 2024-12-19 04:04

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/store_trait.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::{Borrow, BorrowMut, Cow};
16
use std::collections::hash_map::DefaultHasher as StdHasher;
17
use std::convert::Into;
18
use std::hash::{Hash, Hasher};
19
use std::ops::{Bound, RangeBounds};
20
use std::pin::Pin;
21
use std::ptr::addr_eq;
22
use std::sync::{Arc, OnceLock};
23
24
use async_trait::async_trait;
25
use bytes::{Bytes, BytesMut};
26
use futures::future::{select, Either};
27
use futures::{join, try_join, Future, FutureExt, Stream};
28
use nativelink_error::{error_if, make_err, Code, Error, ResultExt};
29
use nativelink_metric::MetricsComponent;
30
use rand::rngs::StdRng;
31
use rand::{RngCore, SeedableRng};
32
use serde::{Deserialize, Serialize};
33
use tokio::io::AsyncSeekExt;
34
use tokio::time::timeout;
35
36
use crate::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
37
use crate::common::DigestInfo;
38
use crate::digest_hasher::{default_digest_hasher_func, DigestHasher, DigestHasherFunc};
39
use crate::fs::{self, idle_file_descriptor_timeout};
40
use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
41
42
static DEFAULT_DIGEST_SIZE_HEALTH_CHECK: OnceLock<usize> = OnceLock::new();
43
/// Default digest size for health check data. Any change in this value
44
/// changes the default contract. `GlobalConfig` should be updated to reflect
45
/// changes in this value.
46
pub const DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG: usize = 1024 * 1024;
47
48
// Get the default digest size for health check data, if value is unset a system wide default is used.
49
0
pub fn default_digest_size_health_check() -> usize {
50
0
    *DEFAULT_DIGEST_SIZE_HEALTH_CHECK.get_or_init(|| DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG)
51
0
}
52
53
/// Set the default digest size for health check data, this should be called once.
54
0
pub fn set_default_digest_size_health_check(size: usize) -> Result<(), Error> {
55
0
    DEFAULT_DIGEST_SIZE_HEALTH_CHECK.set(size).map_err(|_| {
56
0
        make_err!(
57
0
            Code::Internal,
58
0
            "set_default_digest_size_health_check already set"
59
0
        )
60
0
    })
61
0
}
62
63
#[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize)]
64
pub enum UploadSizeInfo {
65
    /// When the data transfer amount is known to be exact size, this enum should be used.
66
    /// The receiver store can use this to better optimize the way the data is sent or stored.
67
    ExactSize(u64),
68
69
    /// When the data transfer amount is not known to be exact, the caller should use this enum
70
    /// to provide the maximum size that could possibly be sent. This will bypass the exact size
71
    /// checks, but still provide useful information to the underlying store about the data being
72
    /// sent that it can then use to optimize the upload process.
73
    MaxSize(u64),
74
}
75
76
/// Utility to send all the data to the store from a file.
77
// Note: This is not inlined because some code may want to bypass any underlying
78
// optimizations that may be present in the inner store.
79
4
pub async fn slow_update_store_with_file<S: StoreDriver + ?Sized>(
80
4
    store: Pin<&S>,
81
4
    digest: impl Into<StoreKey<'_>>,
82
4
    file: &mut fs::ResumeableFileSlot,
83
4
    upload_size: UploadSizeInfo,
84
4
) -> Result<(), Error> {
85
4
    file.as_writer()
86
4
        .await
87
4
        .err_tip(|| 
"Failed to get writer in upload_file_to_store"0
)
?0
88
4
        .rewind()
89
4
        .await
90
4
        .err_tip(|| 
"Failed to rewind in upload_file_to_store"0
)
?0
;
91
4
    let (tx, rx) = make_buf_channel_pair();
92
4
93
4
    let mut update_fut = store
94
4
        .update(digest.into(), rx, upload_size)
95
4
        .map(|r| r.err_tip(|| 
"Could not upload data to store in upload_file_to_store"0
));
96
4
    let read_result = {
97
4
        let read_data_fut = async {
98
4
            let (_, mut tx) = file
99
4
                .read_buf_cb(
100
4
                    (BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE), tx),
101
67
                    move |(chunk, mut tx)| async move {
102
67
                        tx.send(chunk.freeze())
103
67
                            .await
104
67
                            .err_tip(|| 
"Failed to send in upload_file_to_store"0
)
?0
;
105
67
                        Ok((BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE), tx))
106
134
                    },
107
4
                )
108
4
                .await
109
4
                .err_tip(|| 
"Error in upload_file_to_store::read_buf_cb section"0
)
?0
;
110
4
            tx.send_eof()
111
4
                .err_tip(|| 
"Could not send EOF to store in upload_file_to_store"0
)
?0
;
112
4
            Ok(())
113
4
        };
114
4
        tokio::pin!(read_data_fut);
115
4
        match select(&mut update_fut, read_data_fut).await {
116
0
            Either::Left((update_result, read_data_fut)) => {
117
0
                return update_result.merge(read_data_fut.await)
118
            }
119
4
            Either::Right((read_result, _)) => read_result,
120
        }
121
    };
122
4
    if let Ok(
update_result3
) = timeout(idle_file_descriptor_timeout(), &mut update_fut).await {
  Branch (122:12): [True: 2, False: 1]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [Folded - Ignored]
  Branch (122:12): [True: 1, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [Folded - Ignored]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
123
3
        update_result.merge(read_result)
124
    } else {
125
1
        file.close_file()
126
1
            .await
127
1
            .err_tip(|| 
"Failed to close file in upload_file_to_store"0
)
?0
;
128
1
        update_fut.await.merge(read_result)
129
    }
130
4
}
131
132
/// Optimizations that stores may want to expose to the callers.
133
/// This is useful for specific cases when the store can optimize the processing
134
/// of the data being processed.
135
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
136
pub enum StoreOptimizations {
137
    /// The store can optimize the upload process when it knows the data is coming from a file.
138
    FileUpdates,
139
140
    /// If the store will ignore the data uploads.
141
    NoopUpdates,
142
143
    /// If the store will never serve downloads.
144
    NoopDownloads,
145
}
146
147
/// A wrapper struct for [`StoreKey`] to work around
148
/// lifetime limitations in `HashMap::get()` as described in
149
/// <https://github.com/rust-lang/rust/issues/80389>
150
///
151
/// As such this is a wrapper type that is stored in the
152
/// maps using the workaround as described in
153
/// <https://blinsay.com/blog/compound-keys/>
154
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
155
#[repr(transparent)]
156
pub struct StoreKeyBorrow(StoreKey<'static>);
157
158
impl From<StoreKey<'static>> for StoreKeyBorrow {
159
5.79k
    fn from(key: StoreKey<'static>) -> Self {
160
5.79k
        Self(key)
161
5.79k
    }
162
}
163
164
impl From<StoreKeyBorrow> for StoreKey<'static> {
165
0
    fn from(key_borrow: StoreKeyBorrow) -> Self {
166
0
        key_borrow.0
167
0
    }
168
}
169
170
impl<'a> Borrow<StoreKey<'a>> for StoreKeyBorrow {
171
4.70k
    fn borrow(&self) -> &StoreKey<'a> {
172
4.70k
        &self.0
173
4.70k
    }
174
}
175
176
impl<'a> Borrow<StoreKey<'a>> for &StoreKeyBorrow {
177
0
    fn borrow(&self) -> &StoreKey<'a> {
178
0
        &self.0
179
0
    }
180
}
181
182
/// Holds something that can be converted into a key the
183
/// store API can understand. Generally this is a digest
184
/// but it can also be a string if the caller wishes to
185
/// store the data directly and reference it by a string
186
/// directly.
187
#[derive(Debug, Eq)]
188
pub enum StoreKey<'a> {
189
    /// A string key.
190
    Str(Cow<'a, str>),
191
192
    /// A key that is a digest.
193
    Digest(DigestInfo),
194
}
195
196
impl<'a> StoreKey<'a> {
197
    /// Creates a new store key from a string.
198
1
    pub const fn new_str(s: &'a str) -> Self {
199
1
        StoreKey::Str(Cow::Borrowed(s))
200
1
    }
201
202
    /// Returns a shallow clone of the key.
203
    /// This is extremely cheap and should be used when clone
204
    /// is needed but the key is not going to be modified.
205
    #[must_use]
206
6.58k
    pub fn borrow(&'a self) -> StoreKey<'a> {
207
34
        match self {
208
21
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Borrowed(s)),
209
13
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Borrowed(s)),
210
6.54k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
211
        }
212
6.58k
    }
213
214
    /// Converts the key into an owned version. This is useful
215
    /// when the caller needs an owned version of the key.
216
7.13k
    pub fn into_owned(self) -> StoreKey<'static> {
217
34
        match self {
218
6
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Owned(s)),
219
28
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Owned(s.to_owned())),
220
7.10k
            StoreKey::Digest(d) => StoreKey::Digest(d),
221
        }
222
7.13k
    }
223
224
    /// Converts the key into a digest. This is useful when the caller
225
    /// must have a digest key. If the data is not a digest, it may
226
    /// hash the underlying key and return a digest of the hash of the key
227
103
    pub fn into_digest(self) -> DigestInfo {
228
103
        match self {
229
101
            StoreKey::Digest(digest) => digest,
230
2
            StoreKey::Str(s) => {
231
2
                let mut hasher = DigestHasherFunc::Blake3.hasher();
232
2
                hasher.update(s.as_bytes());
233
2
                hasher.finalize_digest()
234
            }
235
        }
236
103
    }
237
238
    /// Returns the key as a string. If the key is a digest, it will
239
    /// return a string representation of the digest. If the key is a string,
240
    /// it will return the string itself.
241
37
    pub fn as_str(&'a self) -> Cow<'a, str> {
242
7
        match self {
243
7
            StoreKey::Str(Cow::Owned(s)) => Cow::Borrowed(s),
244
0
            StoreKey::Str(Cow::Borrowed(s)) => Cow::Borrowed(s),
245
30
            StoreKey::Digest(d) => Cow::Owned(format!("{d}")),
246
        }
247
37
    }
248
}
249
250
impl Clone for StoreKey<'static> {
251
5.80k
    fn clone(&self) -> Self {
252
5.80k
        match self {
253
18
            StoreKey::Str(s) => StoreKey::Str(s.clone()),
254
5.78k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
255
        }
256
5.80k
    }
257
}
258
259
impl PartialOrd for StoreKey<'_> {
260
7
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
261
7
        Some(self.cmp(other))
262
7
    }
263
}
264
265
impl Ord for StoreKey<'_> {
266
24
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
267
24
        match (self, other) {
268
24
            (StoreKey::Str(a), StoreKey::Str(b)) => a.cmp(b),
269
0
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a.cmp(b),
270
0
            (StoreKey::Str(_), StoreKey::Digest(_)) => std::cmp::Ordering::Less,
271
0
            (StoreKey::Digest(_), StoreKey::Str(_)) => std::cmp::Ordering::Greater,
272
        }
273
24
    }
274
}
275
276
impl PartialEq for StoreKey<'_> {
277
5.61k
    fn eq(&self, other: &Self) -> bool {
278
5.61k
        match (self, other) {
279
36
            (StoreKey::Str(a), StoreKey::Str(b)) => a == b,
280
5.57k
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a == b,
281
0
            _ => false,
282
        }
283
5.61k
    }
284
}
285
286
impl Hash for StoreKey<'_> {
287
25.1k
    fn hash<H: Hasher>(&self, state: &mut H) {
288
        /// Salts the hash with the enum value that represents
289
        /// the type of the key.
290
        #[repr(u8)]
291
        enum HashId {
292
            Str = 0,
293
            Digest = 1,
294
        }
295
25.1k
        match self {
296
40
            StoreKey::Str(s) => {
297
40
                (HashId::Str as u8).hash(state);
298
40
                s.hash(state);
299
40
            }
300
25.1k
            StoreKey::Digest(d) => {
301
25.1k
                (HashId::Digest as u8).hash(state);
302
25.1k
                d.hash(state);
303
25.1k
            }
304
        }
305
25.1k
    }
306
}
307
308
impl<'a> From<&'a str> for StoreKey<'a> {
309
0
    fn from(s: &'a str) -> Self {
310
0
        StoreKey::Str(Cow::Borrowed(s))
311
0
    }
312
}
313
314
impl From<String> for StoreKey<'static> {
315
0
    fn from(s: String) -> Self {
316
0
        StoreKey::Str(Cow::Owned(s))
317
0
    }
318
}
319
320
impl From<DigestInfo> for StoreKey<'_> {
321
10.5k
    fn from(d: DigestInfo) -> Self {
322
10.5k
        StoreKey::Digest(d)
323
10.5k
    }
324
}
325
326
impl From<&DigestInfo> for StoreKey<'_> {
327
32
    fn from(d: &DigestInfo) -> Self {
328
32
        StoreKey::Digest(*d)
329
32
    }
330
}
331
332
#[derive(Clone, MetricsComponent)]
333
#[repr(transparent)]
334
pub struct Store {
335
    #[metric]
336
    inner: Arc<dyn StoreDriver>,
337
}
338
339
impl Store {
340
237
    pub fn new(inner: Arc<dyn StoreDriver>) -> Self {
341
237
        Self { inner }
342
237
    }
343
}
344
345
impl Store {
346
    /// Returns the immediate inner store driver.
347
    /// Note: This does not recursively try to resolve underlying store drivers
348
    /// like `.inner_store()` does.
349
    #[inline]
350
1
    pub fn into_inner(self) -> Arc<dyn StoreDriver> {
351
1
        self.inner
352
1
    }
353
354
    /// Gets the underlying store for the given digest.
355
    /// A caller might want to use this to obtain a reference to the "real" underlying store
356
    /// (if applicable) and check if it implements some special traits that allow optimizations.
357
    /// Note: If the store performs complex operations on the data, it should return itself.
358
    #[inline]
359
88
    pub fn inner_store<'a, K: Into<StoreKey<'a>>>(&self, digest: Option<K>) -> &dyn StoreDriver {
360
88
        self.inner.inner_store(digest.map(Into::into))
361
88
    }
362
363
    /// Tries to cast the underlying store to the given type.
364
    #[inline]
365
55
    pub fn downcast_ref<U: StoreDriver>(&self, maybe_digest: Option<StoreKey<'_>>) -> Option<&U> {
366
55
        self.inner.inner_store(maybe_digest).as_any().downcast_ref()
367
55
    }
368
369
    /// Register health checks used to monitor the store.
370
    #[inline]
371
0
    pub fn register_health(&self, registry: &mut HealthRegistryBuilder) {
372
0
        self.inner.clone().register_health(registry);
373
0
    }
374
}
375
376
impl StoreLike for Store {
377
    #[inline]
378
9.98k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
379
9.98k
        self.inner.as_ref()
380
9.98k
    }
381
382
7
    fn as_pin(&self) -> Pin<&Self> {
383
7
        Pin::new(self)
384
7
    }
385
}
386
387
impl<T> StoreLike for T
388
where
389
    T: StoreDriver + Sized,
390
{
391
    #[inline]
392
7.90k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
393
7.90k
        self
394
7.90k
    }
395
396
54
    fn as_pin(&self) -> Pin<&Self> {
397
54
        Pin::new(self)
398
54
    }
399
}
400
401
pub trait StoreLike: Send + Sync + Sized + Unpin + 'static {
402
    /// Returns the immediate inner store driver.
403
    fn as_store_driver(&self) -> &'_ dyn StoreDriver;
404
405
    /// Utility function to return a pinned reference to self.
406
    fn as_pin(&self) -> Pin<&Self>;
407
408
    /// Utility function to return a pinned reference to the store driver.
409
    #[inline]
410
17.8k
    fn as_store_driver_pin(&self) -> Pin<&'_ dyn StoreDriver> {
411
17.8k
        Pin::new(self.as_store_driver())
412
17.8k
    }
413
414
    /// Look up a digest in the store and return None if it does not exist in
415
    /// the store, or Some(size) if it does.
416
    /// Note: On an AC store the size will be incorrect and should not be used!
417
    #[inline]
418
213
    fn has<'a>(
419
213
        &'a self,
420
213
        digest: impl Into<StoreKey<'a>>,
421
213
    ) -> impl Future<Output = Result<Option<u64>, Error>> + 'a {
422
213
        self.as_store_driver_pin().has(digest.into())
423
213
    }
424
425
    /// Look up a list of digests in the store and return a result for each in
426
    /// the same order as input.  The result will either be None if it does not
427
    /// exist in the store, or Some(size) if it does.
428
    /// Note: On an AC store the size will be incorrect and should not be used!
429
    #[inline]
430
19
    fn has_many<'a>(
431
19
        &'a self,
432
19
        digests: &'a [StoreKey<'a>],
433
19
    ) -> impl Future<Output = Result<Vec<Option<u64>>, Error>> + Send + 'a {
434
19
        self.as_store_driver_pin().has_many(digests)
435
19
    }
436
437
    /// The implementation of the above has and `has_many` functions.  See their
438
    /// documentation for details.
439
    #[inline]
440
40
    fn has_with_results<'a>(
441
40
        &'a self,
442
40
        digests: &'a [StoreKey<'a>],
443
40
        results: &'a mut [Option<u64>],
444
40
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
445
40
        self.as_store_driver_pin()
446
40
            .has_with_results(digests, results)
447
40
    }
448
449
    /// List all the keys in the store that are within the given range.
450
    /// `handler` is called for each key in the range. If `handler` returns
451
    /// false, the listing is stopped.
452
    ///
453
    /// The number of keys passed through the handler is the return value.
454
    #[inline]
455
7
    fn list<'a, 'b>(
456
7
        &'a self,
457
7
        range: impl RangeBounds<StoreKey<'b>> + Send + 'b,
458
7
        mut handler: impl for<'c> FnMut(&'c StoreKey) -> bool + Send + Sync + 'a,
459
7
    ) -> impl Future<Output = Result<u64, Error>> + Send + 'a
460
7
    where
461
7
        'b: 'a,
462
7
    {
463
        // Note: We use a manual async move, so the future can own the `range` and `handler`,
464
        // otherwise we'd require the caller to pass them in by reference making more borrow
465
        // checker noise.
466
7
        async move {
467
7
            self.as_store_driver_pin()
468
7
                .list(
469
7
                    (
470
7
                        range.start_bound().map(StoreKey::borrow),
471
7
                        range.end_bound().map(StoreKey::borrow),
472
7
                    ),
473
7
                    &mut handler,
474
7
                )
475
7
                .await
476
7
        }
477
7
    }
478
479
    /// Sends the data to the store.
480
    #[inline]
481
5.14k
    fn update<'a>(
482
5.14k
        &'a self,
483
5.14k
        digest: impl Into<StoreKey<'a>>,
484
5.14k
        reader: DropCloserReadHalf,
485
5.14k
        upload_size: UploadSizeInfo,
486
5.14k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
487
5.14k
        self.as_store_driver_pin()
488
5.14k
            .update(digest.into(), reader, upload_size)
489
5.14k
    }
490
491
    /// Any optimizations the store might want to expose to the callers.
492
    /// By default, no optimizations are exposed.
493
    #[inline]
494
6
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
495
6
        self.as_store_driver_pin().optimized_for(optimization)
496
6
    }
497
498
    /// Specialized version of `.update()` which takes a `ResumeableFileSlot`.
499
    /// This is useful if the underlying store can optimize the upload process
500
    /// when it knows the data is coming from a file.
501
    #[inline]
502
7
    fn update_with_whole_file<'a>(
503
7
        &'a self,
504
7
        digest: impl Into<StoreKey<'a>>,
505
7
        file: fs::ResumeableFileSlot,
506
7
        upload_size: UploadSizeInfo,
507
7
    ) -> impl Future<Output = Result<Option<fs::ResumeableFileSlot>, Error>> + Send + 'a {
508
7
        self.as_store_driver_pin()
509
7
            .update_with_whole_file(digest.into(), file, upload_size)
510
7
    }
511
512
    /// Utility to send all the data to the store when you have all the bytes.
513
    #[inline]
514
5.66k
    fn update_oneshot<'a>(
515
5.66k
        &'a self,
516
5.66k
        digest: impl Into<StoreKey<'a>>,
517
5.66k
        data: Bytes,
518
5.66k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
519
5.66k
        self.as_store_driver_pin()
520
5.66k
            .update_oneshot(digest.into(), data)
521
5.66k
    }
522
523
    /// Retrieves part of the data from the store and writes it to the given writer.
524
    #[inline]
525
1.27k
    fn get_part<'a>(
526
1.27k
        &'a self,
527
1.27k
        digest: impl Into<StoreKey<'a>>,
528
1.27k
        mut writer: impl BorrowMut<DropCloserWriteHalf> + Send + 'a,
529
1.27k
        offset: u64,
530
1.27k
        length: Option<u64>,
531
1.27k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
532
1.27k
        let key = digest.into();
533
        // Note: We need to capture `writer` just in case the caller
534
        // expects the drop() method to be called on it when the future
535
        // is done due to the complex interaction between the DropCloserWriteHalf
536
        // and the DropCloserReadHalf during drop().
537
1.27k
        async move {
538
1.27k
            self.as_store_driver_pin()
539
1.27k
                .get_part(key, writer.borrow_mut(), offset, length)
540
1.27k
                .await
541
1.27k
        }
542
1.27k
    }
543
544
    /// Utility that works the same as `.get_part()`, but writes all the data.
545
    #[inline]
546
15
    fn get<'a>(
547
15
        &'a self,
548
15
        key: impl Into<StoreKey<'a>>,
549
15
        writer: DropCloserWriteHalf,
550
15
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
551
15
        self.as_store_driver_pin().get(key.into(), writer)
552
15
    }
553
554
    /// Utility that will return all the bytes at once instead of in a streaming manner.
555
    #[inline]
556
5.34k
    fn get_part_unchunked<'a>(
557
5.34k
        &'a self,
558
5.34k
        key: impl Into<StoreKey<'a>>,
559
5.34k
        offset: u64,
560
5.34k
        length: Option<u64>,
561
5.34k
    ) -> impl Future<Output = Result<Bytes, Error>> + Send + 'a {
562
5.34k
        self.as_store_driver_pin()
563
5.34k
            .get_part_unchunked(key.into(), offset, length)
564
5.34k
    }
565
566
    /// Default implementation of the health check. Some stores may want to override this
567
    /// in situations where the default implementation is not sufficient.
568
    #[inline]
569
0
    fn check_health(
570
0
        &self,
571
0
        namespace: Cow<'static, str>,
572
0
    ) -> impl Future<Output = HealthStatus> + Send {
573
0
        self.as_store_driver_pin().check_health(namespace)
574
0
    }
575
}
576
577
#[async_trait]
578
pub trait StoreDriver:
579
    Sync + Send + Unpin + MetricsComponent + HealthStatusIndicator + 'static
580
{
581
    /// See: [`StoreLike::has`] for details.
582
    #[inline]
583
221
    async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<u64>, Error> {
584
221
        let mut result = [None];
585
221
        self.has_with_results(&[key], &mut result).await
?0
;
586
221
        Ok(result[0])
587
442
    }
588
589
    /// See: [`StoreLike::has_many`] for details.
590
    #[inline]
591
    async fn has_many(
592
        self: Pin<&Self>,
593
        digests: &[StoreKey<'_>],
594
19
    ) -> Result<Vec<Option<u64>>, Error> {
595
19
        let mut results = vec![None; digests.len()];
596
19
        self.has_with_results(digests, &mut results).await
?0
;
597
19
        Ok(results)
598
38
    }
599
600
    /// See: [`StoreLike::has_with_results`] for details.
601
    async fn has_with_results(
602
        self: Pin<&Self>,
603
        digests: &[StoreKey<'_>],
604
        results: &mut [Option<u64>],
605
    ) -> Result<(), Error>;
606
607
    /// See: [`StoreLike::list`] for details.
608
    async fn list(
609
        self: Pin<&Self>,
610
        _range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
611
        _handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
612
0
    ) -> Result<u64, Error> {
613
        // TODO(allada) We should force all stores to implement this function instead of
614
        // providing a default implementation.
615
0
        Err(make_err!(
616
0
            Code::Unimplemented,
617
0
            "Store::list() not implemented for this store"
618
0
        ))
619
0
    }
620
621
    /// See: [`StoreLike::update`] for details.
622
    async fn update(
623
        self: Pin<&Self>,
624
        key: StoreKey<'_>,
625
        reader: DropCloserReadHalf,
626
        upload_size: UploadSizeInfo,
627
    ) -> Result<(), Error>;
628
629
    /// See: [`StoreLike::optimized_for`] for details.
630
46
    fn optimized_for(&self, _optimization: StoreOptimizations) -> bool {
631
46
        false
632
46
    }
633
634
    /// See: [`StoreLike::update_with_whole_file`] for details.
635
    async fn update_with_whole_file(
636
        self: Pin<&Self>,
637
        key: StoreKey<'_>,
638
        mut file: fs::ResumeableFileSlot,
639
        upload_size: UploadSizeInfo,
640
1
    ) -> Result<Option<fs::ResumeableFileSlot>, Error> {
641
1
        let inner_store = self.inner_store(Some(key.borrow()));
642
1
        if inner_store.optimized_for(StoreOptimizations::FileUpdates) {
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [Folded - Ignored]
  Branch (642:12): [True: 0, False: 1]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [Folded - Ignored]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [True: 0, False: 0]
643
0
            error_if!(
644
0
                addr_eq(inner_store, &*self),
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [Folded - Ignored]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [Folded - Ignored]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
  Branch (644:17): [True: 0, False: 0]
645
                "Store::inner_store() returned self when optimization present"
646
            );
647
0
            return Pin::new(inner_store)
648
0
                .update_with_whole_file(key, file, upload_size)
649
0
                .await;
650
1
        }
651
1
        slow_update_store_with_file(self, key, &mut file, upload_size).await
?0
;
652
1
        Ok(Some(file))
653
2
    }
654
655
    /// See: [`StoreLike::update_oneshot`] for details.
656
5.73k
    async fn update_oneshot(self: Pin<&Self>, key: StoreKey<'_>, data: Bytes) -> Result<(), Error> {
657
        // TODO(blaise.bruer) This is extremely inefficient, since we have exactly
658
        // what we need here. Maybe we could instead make a version of the stream
659
        // that can take objects already fully in memory instead?
660
5.73k
        let (mut tx, rx) = make_buf_channel_pair();
661
662
5.73k
        let data_len =
663
5.73k
            u64::try_from(data.len()).err_tip(|| 
"Could not convert data.len() to u64"0
)
?0
;
664
5.73k
        let send_fut = async move {
665
5.73k
            // Only send if we are not EOF.
666
5.73k
            if !data.is_empty() {
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 31, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [Folded - Ignored]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 1, False: 0]
  Branch (666:16): [True: 16, False: 40]
  Branch (666:16): [True: 8, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 4, False: 1]
  Branch (666:16): [True: 7, False: 0]
  Branch (666:16): [True: 104, False: 0]
  Branch (666:16): [True: 2, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 2, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 4, False: 0]
  Branch (666:16): [True: 1, False: 0]
  Branch (666:16): [True: 2, False: 0]
  Branch (666:16): [True: 2, False: 0]
  Branch (666:16): [True: 2, False: 0]
  Branch (666:16): [True: 2, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 1, False: 0]
  Branch (666:16): [True: 13, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 406, False: 1]
  Branch (666:16): [Folded - Ignored]
  Branch (666:16): [True: 3, False: 2]
  Branch (666:16): [True: 2, False: 0]
  Branch (666:16): [True: 1, False: 0]
  Branch (666:16): [True: 13, False: 1]
  Branch (666:16): [True: 25, False: 13]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 1, False: 0]
  Branch (666:16): [True: 5.00k, False: 0]
  Branch (666:16): [True: 6, False: 0]
  Branch (666:16): [True: 4, False: 0]
  Branch (666:16): [True: 2, False: 0]
  Branch (666:16): [True: 0, False: 0]
  Branch (666:16): [True: 7, False: 0]
667
5.67k
                tx.send(data)
668
5.67k
                    .await
669
5.67k
                    .err_tip(|| 
"Failed to write data in update_oneshot"0
)
?0
;
670
58
            }
671
5.73k
            tx.send_eof()
672
5.73k
                .err_tip(|| 
"Failed to write EOF in update_oneshot"0
)
?0
;
673
5.73k
            Ok(())
674
5.73k
        };
675
5.73k
        try_join!(
676
5.73k
            send_fut,
677
5.73k
            self.update(key, rx, UploadSizeInfo::ExactSize(data_len))
678
5.73k
        )
?2
;
679
5.73k
        Ok(())
680
11.4k
    }
681
682
    /// See: [`StoreLike::get_part`] for details.
683
    async fn get_part(
684
        self: Pin<&Self>,
685
        key: StoreKey<'_>,
686
        writer: &mut DropCloserWriteHalf,
687
        offset: u64,
688
        length: Option<u64>,
689
    ) -> Result<(), Error>;
690
691
    /// See: [`StoreLike::get`] for details.
692
    #[inline]
693
    async fn get(
694
        self: Pin<&Self>,
695
        key: StoreKey<'_>,
696
        mut writer: DropCloserWriteHalf,
697
19
    ) -> Result<(), Error> {
698
19
        self.get_part(key, &mut writer, 0, None).await
699
37
    }
700
701
    /// See: [`StoreLike::get_part_unchunked`] for details.
702
    async fn get_part_unchunked(
703
        self: Pin<&Self>,
704
        key: StoreKey<'_>,
705
        offset: u64,
706
        length: Option<u64>,
707
5.42k
    ) -> Result<Bytes, Error> {
708
5.42k
        let length_usize = length
709
5.42k
            .map(|v| 
usize::try_from(v).err_tip(2.21k
||
"Could not convert length to usize"0
)2.21k
)
710
5.42k
            .transpose()
?0
;
711
712
        // TODO(blaise.bruer) This is extremely inefficient, since we have exactly
713
        // what we need here. Maybe we could instead make a version of the stream
714
        // that can take objects already fully in memory instead?
715
5.42k
        let (mut tx, mut rx) = make_buf_channel_pair();
716
717
5.42k
        let (data_res, get_part_res) = join!(
718
5.42k
            rx.consume(length_usize),
719
5.42k
            // We use a closure here to ensure that the `tx` is dropped when the
720
5.42k
            // future is done.
721
5.42k
            async move { self.get_part(key, &mut tx, offset, length).await },
722
5.42k
        );
723
5.42k
        get_part_res
724
5.42k
            .err_tip(|| 
"Failed to get_part in get_part_unchunked"9
)
725
5.42k
            .merge(data_res.err_tip(|| 
"Failed to read stream to completion in get_part_unchunked"9
))
726
10.8k
    }
727
728
    /// See: [`StoreLike::check_health`] for details.
729
0
    async fn check_health(self: Pin<&Self>, namespace: Cow<'static, str>) -> HealthStatus {
730
0
        let digest_data_size = default_digest_size_health_check();
731
0
        let mut digest_data = vec![0u8; digest_data_size];
732
0
733
0
        let mut namespace_hasher = StdHasher::new();
734
0
        namespace.hash(&mut namespace_hasher);
735
0
        self.get_name().hash(&mut namespace_hasher);
736
0
        let hash_seed = namespace_hasher.finish();
737
0
738
0
        // Fill the digest data with random data based on a stable
739
0
        // hash of the namespace and store name. Intention is to
740
0
        // have randomly filled data that is unique per store and
741
0
        // does not change between health checks. This is to ensure
742
0
        // we are not adding more data to store on each health check.
743
0
        let mut rng: StdRng = StdRng::seed_from_u64(hash_seed);
744
0
        rng.fill_bytes(&mut digest_data);
745
0
746
0
        let mut digest_hasher = default_digest_hasher_func().hasher();
747
0
        digest_hasher.update(&digest_data);
748
0
        let digest_data_len = digest_data.len() as u64;
749
0
        let digest_info = StoreKey::from(digest_hasher.finalize_digest());
750
0
751
0
        let digest_bytes = bytes::Bytes::copy_from_slice(&digest_data);
752
753
0
        if let Err(e) = self
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [Folded - Ignored]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [Folded - Ignored]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
  Branch (753:16): [True: 0, False: 0]
754
0
            .update_oneshot(digest_info.borrow(), digest_bytes.clone())
755
0
            .await
756
        {
757
0
            return HealthStatus::new_failed(
758
0
                self.get_ref(),
759
0
                format!("Store.update_oneshot() failed: {e}").into(),
760
0
            );
761
0
        }
762
0
763
0
        match self.has(digest_info.borrow()).await {
764
0
            Ok(Some(s)) => {
765
0
                if s != digest_data_len {
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [Folded - Ignored]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [Folded - Ignored]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
  Branch (765:20): [True: 0, False: 0]
766
0
                    return HealthStatus::new_failed(
767
0
                        self.get_ref(),
768
0
                        format!("Store.has() size mismatch {s} != {digest_data_len}").into(),
769
0
                    );
770
0
                }
771
            }
772
            Ok(None) => {
773
0
                return HealthStatus::new_failed(
774
0
                    self.get_ref(),
775
0
                    "Store.has() size not found".into(),
776
0
                );
777
            }
778
0
            Err(e) => {
779
0
                return HealthStatus::new_failed(
780
0
                    self.get_ref(),
781
0
                    format!("Store.has() failed: {e}").into(),
782
0
                );
783
            }
784
        }
785
786
0
        match self
787
0
            .get_part_unchunked(digest_info, 0, Some(digest_data_len))
788
0
            .await
789
        {
790
0
            Ok(b) => {
791
0
                if b != digest_bytes {
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [Folded - Ignored]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [Folded - Ignored]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [True: 0, False: 0]
792
0
                    return HealthStatus::new_failed(
793
0
                        self.get_ref(),
794
0
                        "Store.get_part_unchunked() data mismatch".into(),
795
0
                    );
796
0
                }
797
            }
798
0
            Err(e) => {
799
0
                return HealthStatus::new_failed(
800
0
                    self.get_ref(),
801
0
                    format!("Store.get_part_unchunked() failed: {e}").into(),
802
0
                );
803
            }
804
        }
805
806
0
        HealthStatus::new_ok(self.get_ref(), "Successfully store health check".into())
807
0
    }
808
809
    /// See: [`Store::inner_store`] for details.
810
    fn inner_store(&self, _digest: Option<StoreKey<'_>>) -> &dyn StoreDriver;
811
812
    /// Returns an Any variation of whatever Self is.
813
    fn as_any(&self) -> &(dyn std::any::Any + Sync + Send + 'static);
814
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static>;
815
816
    // Register health checks used to monitor the store.
817
0
    fn register_health(self: Arc<Self>, _registry: &mut HealthRegistryBuilder) {}
818
}
819
820
/// The instructions on how to decode a value from a Bytes & version into
821
/// the underlying type.
822
pub trait SchedulerStoreDecodeTo {
823
    type DecodeOutput;
824
    fn decode(version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error>;
825
}
826
827
pub trait SchedulerSubscription: Send + Sync {
828
    fn changed(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
829
}
830
831
pub trait SchedulerSubscriptionManager: Send + Sync {
832
    type Subscription: SchedulerSubscription;
833
834
    fn notify_for_test(&self, value: String);
835
836
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
837
    where
838
        K: SchedulerStoreKeyProvider;
839
}
840
841
/// The API surface for a scheduler store.
842
pub trait SchedulerStore: Send + Sync + 'static {
843
    type SubscriptionManager: SchedulerSubscriptionManager;
844
845
    /// Returns the subscription manager for the scheduler store.
846
    fn subscription_manager(&self) -> Result<Arc<Self::SubscriptionManager>, Error>;
847
848
    /// Updates or inserts an entry into the underlying store.
849
    /// Metadata about the key is attached to the compile-time type.
850
    /// If `StoreKeyProvider::Versioned` is `TrueValue`, the data will not
851
    /// be updated if the current version in the database does not match
852
    /// the version in the passed in data.
853
    /// No guarantees are made about when `Version` is `FalseValue`.
854
    /// Indexes are guaranteed to be updated atomically with the data.
855
    fn update_data<T>(&self, data: T) -> impl Future<Output = Result<Option<u64>, Error>> + Send
856
    where
857
        T: SchedulerStoreDataProvider
858
            + SchedulerStoreKeyProvider
859
            + SchedulerCurrentVersionProvider
860
            + Send;
861
862
    /// Searches for all keys in the store that match the given index prefix.
863
    fn search_by_index_prefix<K>(
864
        &self,
865
        index: K,
866
    ) -> impl Future<
867
        Output = Result<
868
            impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
869
            Error,
870
        >,
871
    > + Send
872
    where
873
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send;
874
875
    /// Returns data for the provided key with the given version if
876
    /// `StoreKeyProvider::Versioned` is `TrueValue`.
877
    fn get_and_decode<K>(
878
        &self,
879
        key: K,
880
    ) -> impl Future<Output = Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>> + Send
881
    where
882
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send;
883
}
884
885
/// A type that is used to let the scheduler store know what
886
/// index is being requested.
887
pub trait SchedulerIndexProvider {
888
    /// Only keys inserted with this prefix will be indexed.
889
    const KEY_PREFIX: &'static str;
890
891
    /// The name of the index.
892
    const INDEX_NAME: &'static str;
893
894
    /// The sort key for the index (if any).
895
    const MAYBE_SORT_KEY: Option<&'static str> = None;
896
897
    /// If the data is versioned.
898
    type Versioned: BoolValue;
899
900
    /// The value of the index.
901
    fn index_value(&self) -> Cow<'_, str>;
902
}
903
904
/// Provides a key to lookup data in the store.
905
pub trait SchedulerStoreKeyProvider {
906
    /// If the data is versioned.
907
    type Versioned: BoolValue;
908
909
    /// Returns the key for the data.
910
    fn get_key(&self) -> StoreKey<'static>;
911
}
912
913
/// Provides data to be stored in the scheduler store.
914
pub trait SchedulerStoreDataProvider {
915
    /// Converts the data into bytes to be stored in the store.
916
    fn try_into_bytes(self) -> Result<Bytes, Error>;
917
918
    /// Returns the indexes for the data if any.
919
1
    fn get_indexes(&self) -> Result<Vec<(&'static str, Bytes)>, Error> {
920
1
        Ok(Vec::new())
921
1
    }
922
}
923
924
/// Provides the current version of the data in the store.
925
pub trait SchedulerCurrentVersionProvider {
926
    /// Returns the current version of the data in the store.
927
    fn current_version(&self) -> u64;
928
}
929
930
/// Default implementation for when we are not providing a version
931
/// for the data.
932
impl<T> SchedulerCurrentVersionProvider for T
933
where
934
    T: SchedulerStoreKeyProvider<Versioned = FalseValue>,
935
{
936
0
    fn current_version(&self) -> u64 {
937
0
        0
938
0
    }
939
}
940
941
/// Compile time types for booleans.
942
pub trait BoolValue {
943
    const VALUE: bool;
944
}
945
/// Compile time check if something is false.
946
pub trait IsFalse {}
947
/// Compile time check if something is true.
948
pub trait IsTrue {}
949
950
/// Compile time true value.
951
pub struct TrueValue;
952
impl BoolValue for TrueValue {
953
    const VALUE: bool = true;
954
}
955
impl IsTrue for TrueValue {}
956
957
/// Compile time false value.
958
pub struct FalseValue;
959
impl BoolValue for FalseValue {
960
    const VALUE: bool = false;
961
}
962
impl IsFalse for FalseValue {}