Coverage Report

Created: 2025-03-08 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/store_trait.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::{Borrow, BorrowMut, Cow};
16
use std::collections::hash_map::DefaultHasher as StdHasher;
17
use std::convert::Into;
18
use std::ffi::OsString;
19
use std::hash::{Hash, Hasher};
20
use std::ops::{Bound, RangeBounds};
21
use std::pin::Pin;
22
use std::ptr::addr_eq;
23
use std::sync::{Arc, OnceLock};
24
25
use async_trait::async_trait;
26
use bytes::{Bytes, BytesMut};
27
use futures::{join, try_join, Future, FutureExt, Stream};
28
use nativelink_error::{error_if, make_err, Code, Error, ResultExt};
29
use nativelink_metric::MetricsComponent;
30
use rand::rngs::StdRng;
31
use rand::{RngCore, SeedableRng};
32
use serde::{Deserialize, Serialize};
33
use tokio::io::{AsyncReadExt, AsyncSeekExt};
34
35
use crate::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
36
use crate::common::DigestInfo;
37
use crate::digest_hasher::{default_digest_hasher_func, DigestHasher, DigestHasherFunc};
38
use crate::fs;
39
use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
40
41
static DEFAULT_DIGEST_SIZE_HEALTH_CHECK: OnceLock<usize> = OnceLock::new();
42
/// Default digest size for health check data. Any change in this value
43
/// changes the default contract. `GlobalConfig` should be updated to reflect
44
/// changes in this value.
45
pub const DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG: usize = 1024 * 1024;
46
47
// Get the default digest size for health check data, if value is unset a system wide default is used.
48
0
pub fn default_digest_size_health_check() -> usize {
49
0
    *DEFAULT_DIGEST_SIZE_HEALTH_CHECK.get_or_init(|| DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG)
50
0
}
51
52
/// Set the default digest size for health check data, this should be called once.
53
0
pub fn set_default_digest_size_health_check(size: usize) -> Result<(), Error> {
54
0
    DEFAULT_DIGEST_SIZE_HEALTH_CHECK.set(size).map_err(|_| {
55
0
        make_err!(
56
0
            Code::Internal,
57
0
            "set_default_digest_size_health_check already set"
58
0
        )
59
0
    })
60
0
}
61
62
#[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize)]
63
pub enum UploadSizeInfo {
64
    /// When the data transfer amount is known to be exact size, this enum should be used.
65
    /// The receiver store can use this to better optimize the way the data is sent or stored.
66
    ExactSize(u64),
67
68
    /// When the data transfer amount is not known to be exact, the caller should use this enum
69
    /// to provide the maximum size that could possibly be sent. This will bypass the exact size
70
    /// checks, but still provide useful information to the underlying store about the data being
71
    /// sent that it can then use to optimize the upload process.
72
    MaxSize(u64),
73
}
74
75
/// Utility to send all the data to the store from a file.
76
// Note: This is not inlined because some code may want to bypass any underlying
77
// optimizations that may be present in the inner store.
78
8
pub async fn slow_update_store_with_file<S: StoreDriver + ?Sized>(
79
8
    store: Pin<&S>,
80
8
    digest: impl Into<StoreKey<'_>>,
81
8
    file: &mut fs::FileSlot,
82
8
    upload_size: UploadSizeInfo,
83
8
) -> Result<(), Error> {
84
8
    file.rewind()
85
8
        .await
86
8
        .err_tip(|| 
"Failed to rewind in upload_file_to_store"0
)
?0
;
87
8
    let (mut tx, rx) = make_buf_channel_pair();
88
8
89
8
    let update_fut = store
90
8
        .update(digest.into(), rx, upload_size)
91
8
        .map(|r| r.err_tip(|| 
"Could not upload data to store in upload_file_to_store"0
));
92
8
    let read_data_fut = async move {
93
        loop {
94
77
            let mut buf = BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE);
95
77
            let read = file
96
77
                .read_buf(&mut buf)
97
77
                .await
98
77
                .err_tip(|| 
"Failed to read in upload_file_to_store"0
)
?0
;
99
77
            if read == 0 {
  Branch (99:16): [True: 7, False: 5]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [Folded - Ignored]
  Branch (99:16): [True: 1, False: 64]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [Folded - Ignored]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [True: 0, False: 0]
100
8
                break;
101
69
            }
102
69
            tx.send(buf.freeze())
103
69
                .await
104
69
                .err_tip(|| 
"Failed to send in upload_file_to_store"0
)
?0
;
105
        }
106
8
        tx.send_eof()
107
8
            .err_tip(|| 
"Could not send EOF to store in upload_file_to_store"0
)
108
8
    };
109
8
    tokio::pin!(read_data_fut);
110
8
    let (update_res, read_res) = tokio::join!(update_fut, read_data_fut);
111
8
    update_res.merge(read_res)
112
8
}
113
114
/// Optimizations that stores may want to expose to the callers.
115
/// This is useful for specific cases when the store can optimize the processing
116
/// of the data being processed.
117
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
118
pub enum StoreOptimizations {
119
    /// The store can optimize the upload process when it knows the data is coming from a file.
120
    FileUpdates,
121
122
    /// If the store will ignore the data uploads.
123
    NoopUpdates,
124
125
    /// If the store will never serve downloads.
126
    NoopDownloads,
127
}
128
129
/// A wrapper struct for [`StoreKey`] to work around
130
/// lifetime limitations in `HashMap::get()` as described in
131
/// <https://github.com/rust-lang/rust/issues/80389>
132
///
133
/// As such this is a wrapper type that is stored in the
134
/// maps using the workaround as described in
135
/// <https://blinsay.com/blog/compound-keys/>
136
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
137
#[repr(transparent)]
138
pub struct StoreKeyBorrow(StoreKey<'static>);
139
140
impl From<StoreKey<'static>> for StoreKeyBorrow {
141
5.86k
    fn from(key: StoreKey<'static>) -> Self {
142
5.86k
        Self(key)
143
5.86k
    }
144
}
145
146
impl From<StoreKeyBorrow> for StoreKey<'static> {
147
0
    fn from(key_borrow: StoreKeyBorrow) -> Self {
148
0
        key_borrow.0
149
0
    }
150
}
151
152
impl<'a> Borrow<StoreKey<'a>> for StoreKeyBorrow {
153
4.64k
    fn borrow(&self) -> &StoreKey<'a> {
154
4.64k
        &self.0
155
4.64k
    }
156
}
157
158
impl<'a> Borrow<StoreKey<'a>> for &StoreKeyBorrow {
159
0
    fn borrow(&self) -> &StoreKey<'a> {
160
0
        &self.0
161
0
    }
162
}
163
164
/// Holds something that can be converted into a key the
165
/// store API can understand. Generally this is a digest
166
/// but it can also be a string if the caller wishes to
167
/// store the data directly and reference it by a string
168
/// directly.
169
#[derive(Debug, Eq)]
170
pub enum StoreKey<'a> {
171
    /// A string key.
172
    Str(Cow<'a, str>),
173
174
    /// A key that is a digest.
175
    Digest(DigestInfo),
176
}
177
178
impl<'a> StoreKey<'a> {
179
    /// Creates a new store key from a string.
180
1
    pub const fn new_str(s: &'a str) -> Self {
181
1
        StoreKey::Str(Cow::Borrowed(s))
182
1
    }
183
184
    /// Returns a shallow clone of the key.
185
    /// This is extremely cheap and should be used when clone
186
    /// is needed but the key is not going to be modified.
187
    #[must_use]
188
6.79k
    pub fn borrow(&'a self) -> StoreKey<'a> {
189
34
        match self {
190
21
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Borrowed(s)),
191
13
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Borrowed(s)),
192
6.76k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
193
        }
194
6.79k
    }
195
196
    /// Converts the key into an owned version. This is useful
197
    /// when the caller needs an owned version of the key.
198
7.24k
    pub fn into_owned(self) -> StoreKey<'static> {
199
34
        match self {
200
6
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Owned(s)),
201
28
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Owned(s.to_owned())),
202
7.21k
            StoreKey::Digest(d) => StoreKey::Digest(d),
203
        }
204
7.24k
    }
205
206
    /// Converts the key into a digest. This is useful when the caller
207
    /// must have a digest key. If the data is not a digest, it may
208
    /// hash the underlying key and return a digest of the hash of the key
209
147
    pub fn into_digest(self) -> DigestInfo {
210
147
        match self {
211
145
            StoreKey::Digest(digest) => digest,
212
2
            StoreKey::Str(s) => {
213
2
                let mut hasher = DigestHasherFunc::Blake3.hasher();
214
2
                hasher.update(s.as_bytes());
215
2
                hasher.finalize_digest()
216
            }
217
        }
218
147
    }
219
220
    /// Returns the key as a string. If the key is a digest, it will
221
    /// return a string representation of the digest. If the key is a string,
222
    /// it will return the string itself.
223
37
    pub fn as_str(&'a self) -> Cow<'a, str> {
224
7
        match self {
225
7
            StoreKey::Str(Cow::Owned(s)) => Cow::Borrowed(s),
226
0
            StoreKey::Str(Cow::Borrowed(s)) => Cow::Borrowed(s),
227
30
            StoreKey::Digest(d) => Cow::Owned(format!("{d}")),
228
        }
229
37
    }
230
}
231
232
impl Clone for StoreKey<'static> {
233
5.86k
    fn clone(&self) -> Self {
234
5.86k
        match self {
235
18
            StoreKey::Str(s) => StoreKey::Str(s.clone()),
236
5.85k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
237
        }
238
5.86k
    }
239
}
240
241
impl PartialOrd for StoreKey<'_> {
242
7
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
243
7
        Some(self.cmp(other))
244
7
    }
245
}
246
247
impl Ord for StoreKey<'_> {
248
24
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
249
24
        match (self, other) {
250
24
            (StoreKey::Str(a), StoreKey::Str(b)) => a.cmp(b),
251
0
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a.cmp(b),
252
0
            (StoreKey::Str(_), StoreKey::Digest(_)) => std::cmp::Ordering::Less,
253
0
            (StoreKey::Digest(_), StoreKey::Str(_)) => std::cmp::Ordering::Greater,
254
        }
255
24
    }
256
}
257
258
impl PartialEq for StoreKey<'_> {
259
5.59k
    fn eq(&self, other: &Self) -> bool {
260
5.59k
        match (self, other) {
261
36
            (StoreKey::Str(a), StoreKey::Str(b)) => a == b,
262
5.56k
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a == b,
263
0
            _ => false,
264
        }
265
5.59k
    }
266
}
267
268
impl Hash for StoreKey<'_> {
269
25.2k
    fn hash<H: Hasher>(&self, state: &mut H) {
270
        /// Salts the hash with the enum value that represents
271
        /// the type of the key.
272
        #[repr(u8)]
273
        enum HashId {
274
            Str = 0,
275
            Digest = 1,
276
        }
277
25.2k
        match self {
278
40
            StoreKey::Str(s) => {
279
40
                (HashId::Str as u8).hash(state);
280
40
                s.hash(state);
281
40
            }
282
25.1k
            StoreKey::Digest(d) => {
283
25.1k
                (HashId::Digest as u8).hash(state);
284
25.1k
                d.hash(state);
285
25.1k
            }
286
        }
287
25.2k
    }
288
}
289
290
impl<'a> From<&'a str> for StoreKey<'a> {
291
0
    fn from(s: &'a str) -> Self {
292
0
        StoreKey::Str(Cow::Borrowed(s))
293
0
    }
294
}
295
296
impl From<String> for StoreKey<'static> {
297
0
    fn from(s: String) -> Self {
298
0
        StoreKey::Str(Cow::Owned(s))
299
0
    }
300
}
301
302
impl From<DigestInfo> for StoreKey<'_> {
303
10.4k
    fn from(d: DigestInfo) -> Self {
304
10.4k
        StoreKey::Digest(d)
305
10.4k
    }
306
}
307
308
impl From<&DigestInfo> for StoreKey<'_> {
309
39
    fn from(d: &DigestInfo) -> Self {
310
39
        StoreKey::Digest(*d)
311
39
    }
312
}
313
314
#[derive(Clone, MetricsComponent)]
315
#[repr(transparent)]
316
pub struct Store {
317
    #[metric]
318
    inner: Arc<dyn StoreDriver>,
319
}
320
321
impl Store {
322
267
    pub fn new(inner: Arc<dyn StoreDriver>) -> Self {
323
267
        Self { inner }
324
267
    }
325
}
326
327
impl Store {
328
    /// Returns the immediate inner store driver.
329
    /// Note: This does not recursively try to resolve underlying store drivers
330
    /// like `.inner_store()` does.
331
    #[inline]
332
1
    pub fn into_inner(self) -> Arc<dyn StoreDriver> {
333
1
        self.inner
334
1
    }
335
336
    /// Gets the underlying store for the given digest.
337
    /// A caller might want to use this to obtain a reference to the "real" underlying store
338
    /// (if applicable) and check if it implements some special traits that allow optimizations.
339
    /// Note: If the store performs complex operations on the data, it should return itself.
340
    #[inline]
341
164
    pub fn inner_store<'a, K: Into<StoreKey<'a>>>(&self, digest: Option<K>) -> &dyn StoreDriver {
342
164
        self.inner.inner_store(digest.map(Into::into))
343
164
    }
344
345
    /// Tries to cast the underlying store to the given type.
346
    #[inline]
347
64
    pub fn downcast_ref<U: StoreDriver>(&self, maybe_digest: Option<StoreKey<'_>>) -> Option<&U> {
348
64
        self.inner.inner_store(maybe_digest).as_any().downcast_ref()
349
64
    }
350
351
    /// Register health checks used to monitor the store.
352
    #[inline]
353
0
    pub fn register_health(&self, registry: &mut HealthRegistryBuilder) {
354
0
        self.inner.clone().register_health(registry);
355
0
    }
356
}
357
358
impl StoreLike for Store {
359
    #[inline]
360
10.0k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
361
10.0k
        self.inner.as_ref()
362
10.0k
    }
363
364
7
    fn as_pin(&self) -> Pin<&Self> {
365
7
        Pin::new(self)
366
7
    }
367
}
368
369
impl<T> StoreLike for T
370
where
371
    T: StoreDriver + Sized,
372
{
373
    #[inline]
374
7.97k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
375
7.97k
        self
376
7.97k
    }
377
378
84
    fn as_pin(&self) -> Pin<&Self> {
379
84
        Pin::new(self)
380
84
    }
381
}
382
383
pub trait StoreLike: Send + Sync + Sized + Unpin + 'static {
384
    /// Returns the immediate inner store driver.
385
    fn as_store_driver(&self) -> &'_ dyn StoreDriver;
386
387
    /// Utility function to return a pinned reference to self.
388
    fn as_pin(&self) -> Pin<&Self>;
389
390
    /// Utility function to return a pinned reference to the store driver.
391
    #[inline]
392
17.9k
    fn as_store_driver_pin(&self) -> Pin<&'_ dyn StoreDriver> {
393
17.9k
        Pin::new(self.as_store_driver())
394
17.9k
    }
395
396
    /// Look up a digest in the store and return None if it does not exist in
397
    /// the store, or Some(size) if it does.
398
    /// Note: On an AC store the size will be incorrect and should not be used!
399
    #[inline]
400
223
    fn has<'a>(
401
223
        &'a self,
402
223
        digest: impl Into<StoreKey<'a>>,
403
223
    ) -> impl Future<Output = Result<Option<u64>, Error>> + 'a {
404
223
        self.as_store_driver_pin().has(digest.into())
405
223
    }
406
407
    /// Look up a list of digests in the store and return a result for each in
408
    /// the same order as input.  The result will either be None if it does not
409
    /// exist in the store, or Some(size) if it does.
410
    /// Note: On an AC store the size will be incorrect and should not be used!
411
    #[inline]
412
19
    fn has_many<'a>(
413
19
        &'a self,
414
19
        digests: &'a [StoreKey<'a>],
415
19
    ) -> impl Future<Output = Result<Vec<Option<u64>>, Error>> + Send + 'a {
416
19
        self.as_store_driver_pin().has_many(digests)
417
19
    }
418
419
    /// The implementation of the above has and `has_many` functions.  See their
420
    /// documentation for details.
421
    #[inline]
422
40
    fn has_with_results<'a>(
423
40
        &'a self,
424
40
        digests: &'a [StoreKey<'a>],
425
40
        results: &'a mut [Option<u64>],
426
40
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
427
40
        self.as_store_driver_pin()
428
40
            .has_with_results(digests, results)
429
40
    }
430
431
    /// List all the keys in the store that are within the given range.
432
    /// `handler` is called for each key in the range. If `handler` returns
433
    /// false, the listing is stopped.
434
    ///
435
    /// The number of keys passed through the handler is the return value.
436
    #[inline]
437
7
    fn list<'a, 'b>(
438
7
        &'a self,
439
7
        range: impl RangeBounds<StoreKey<'b>> + Send + 'b,
440
7
        mut handler: impl for<'c> FnMut(&'c StoreKey) -> bool + Send + Sync + 'a,
441
7
    ) -> impl Future<Output = Result<u64, Error>> + Send + 'a
442
7
    where
443
7
        'b: 'a,
444
7
    {
445
        // Note: We use a manual async move, so the future can own the `range` and `handler`,
446
        // otherwise we'd require the caller to pass them in by reference making more borrow
447
        // checker noise.
448
7
        async move {
449
7
            self.as_store_driver_pin()
450
7
                .list(
451
7
                    (
452
7
                        range.start_bound().map(StoreKey::borrow),
453
7
                        range.end_bound().map(StoreKey::borrow),
454
7
                    ),
455
7
                    &mut handler,
456
7
                )
457
7
                .await
458
7
        }
459
7
    }
460
461
    /// Sends the data to the store.
462
    #[inline]
463
5.21k
    fn update<'a>(
464
5.21k
        &'a self,
465
5.21k
        digest: impl Into<StoreKey<'a>>,
466
5.21k
        reader: DropCloserReadHalf,
467
5.21k
        upload_size: UploadSizeInfo,
468
5.21k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
469
5.21k
        self.as_store_driver_pin()
470
5.21k
            .update(digest.into(), reader, upload_size)
471
5.21k
    }
472
473
    /// Any optimizations the store might want to expose to the callers.
474
    /// By default, no optimizations are exposed.
475
    #[inline]
476
14
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
477
14
        self.as_store_driver_pin().optimized_for(optimization)
478
14
    }
479
480
    /// Specialized version of `.update()` which takes a `FileSlot`.
481
    /// This is useful if the underlying store can optimize the upload process
482
    /// when it knows the data is coming from a file.
483
    #[inline]
484
10
    fn update_with_whole_file<'a>(
485
10
        &'a self,
486
10
        digest: impl Into<StoreKey<'a>>,
487
10
        path: OsString,
488
10
        file: fs::FileSlot,
489
10
        upload_size: UploadSizeInfo,
490
10
    ) -> impl Future<Output = Result<Option<fs::FileSlot>, Error>> + Send + 'a {
491
10
        self.as_store_driver_pin()
492
10
            .update_with_whole_file(digest.into(), path, file, upload_size)
493
10
    }
494
495
    /// Utility to send all the data to the store when you have all the bytes.
496
    #[inline]
497
5.66k
    fn update_oneshot<'a>(
498
5.66k
        &'a self,
499
5.66k
        digest: impl Into<StoreKey<'a>>,
500
5.66k
        data: Bytes,
501
5.66k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
502
5.66k
        self.as_store_driver_pin()
503
5.66k
            .update_oneshot(digest.into(), data)
504
5.66k
    }
505
506
    /// Retrieves part of the data from the store and writes it to the given writer.
507
    #[inline]
508
1.30k
    fn get_part<'a>(
509
1.30k
        &'a self,
510
1.30k
        digest: impl Into<StoreKey<'a>>,
511
1.30k
        mut writer: impl BorrowMut<DropCloserWriteHalf> + Send + 'a,
512
1.30k
        offset: u64,
513
1.30k
        length: Option<u64>,
514
1.30k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
515
1.30k
        let key = digest.into();
516
        // Note: We need to capture `writer` just in case the caller
517
        // expects the drop() method to be called on it when the future
518
        // is done due to the complex interaction between the DropCloserWriteHalf
519
        // and the DropCloserReadHalf during drop().
520
1.30k
        async move {
521
1.30k
            self.as_store_driver_pin()
522
1.30k
                .get_part(key, writer.borrow_mut(), offset, length)
523
1.30k
                .await
524
1.30k
        }
525
1.30k
    }
526
527
    /// Utility that works the same as `.get_part()`, but writes all the data.
528
    #[inline]
529
15
    fn get<'a>(
530
15
        &'a self,
531
15
        key: impl Into<StoreKey<'a>>,
532
15
        writer: DropCloserWriteHalf,
533
15
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
534
15
        self.as_store_driver_pin().get(key.into(), writer)
535
15
    }
536
537
    /// Utility that will return all the bytes at once instead of in a streaming manner.
538
    #[inline]
539
5.25k
    fn get_part_unchunked<'a>(
540
5.25k
        &'a self,
541
5.25k
        key: impl Into<StoreKey<'a>>,
542
5.25k
        offset: u64,
543
5.25k
        length: Option<u64>,
544
5.25k
    ) -> impl Future<Output = Result<Bytes, Error>> + Send + 'a {
545
5.25k
        self.as_store_driver_pin()
546
5.25k
            .get_part_unchunked(key.into(), offset, length)
547
5.25k
    }
548
549
    /// Default implementation of the health check. Some stores may want to override this
550
    /// in situations where the default implementation is not sufficient.
551
    #[inline]
552
0
    fn check_health(
553
0
        &self,
554
0
        namespace: Cow<'static, str>,
555
0
    ) -> impl Future<Output = HealthStatus> + Send {
556
0
        self.as_store_driver_pin().check_health(namespace)
557
0
    }
558
}
559
560
#[async_trait]
561
pub trait StoreDriver:
562
    Sync + Send + Unpin + MetricsComponent + HealthStatusIndicator + 'static
563
{
564
    /// See: [`StoreLike::has`] for details.
565
    #[inline]
566
239
    async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<u64>, Error> {
567
239
        let mut result = [None];
568
239
        self.has_with_results(&[key], &mut result).await
?0
;
569
239
        Ok(result[0])
570
478
    }
571
572
    /// See: [`StoreLike::has_many`] for details.
573
    #[inline]
574
    async fn has_many(
575
        self: Pin<&Self>,
576
        digests: &[StoreKey<'_>],
577
19
    ) -> Result<Vec<Option<u64>>, Error> {
578
19
        let mut results = vec![None; digests.len()];
579
19
        self.has_with_results(digests, &mut results).await
?0
;
580
19
        Ok(results)
581
38
    }
582
583
    /// See: [`StoreLike::has_with_results`] for details.
584
    async fn has_with_results(
585
        self: Pin<&Self>,
586
        digests: &[StoreKey<'_>],
587
        results: &mut [Option<u64>],
588
    ) -> Result<(), Error>;
589
590
    /// See: [`StoreLike::list`] for details.
591
    async fn list(
592
        self: Pin<&Self>,
593
        _range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
594
        _handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
595
0
    ) -> Result<u64, Error> {
596
        // TODO(allada) We should force all stores to implement this function instead of
597
        // providing a default implementation.
598
0
        Err(make_err!(
599
0
            Code::Unimplemented,
600
0
            "Store::list() not implemented for this store"
601
0
        ))
602
0
    }
603
604
    /// See: [`StoreLike::update`] for details.
605
    async fn update(
606
        self: Pin<&Self>,
607
        key: StoreKey<'_>,
608
        reader: DropCloserReadHalf,
609
        upload_size: UploadSizeInfo,
610
    ) -> Result<(), Error>;
611
612
    /// See: [`StoreLike::optimized_for`] for details.
613
90
    fn optimized_for(&self, _optimization: StoreOptimizations) -> bool {
614
90
        false
615
90
    }
616
617
    /// See: [`StoreLike::update_with_whole_file`] for details.
618
    async fn update_with_whole_file(
619
        self: Pin<&Self>,
620
        key: StoreKey<'_>,
621
        path: OsString,
622
        mut file: fs::FileSlot,
623
        upload_size: UploadSizeInfo,
624
1
    ) -> Result<Option<fs::FileSlot>, Error> {
625
1
        let inner_store = self.inner_store(Some(key.borrow()));
626
1
        if inner_store.optimized_for(StoreOptimizations::FileUpdates) {
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [Folded - Ignored]
  Branch (626:12): [True: 0, False: 1]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [Folded - Ignored]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
  Branch (626:12): [True: 0, False: 0]
627
0
            error_if!(
628
0
                addr_eq(inner_store, &*self),
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [Folded - Ignored]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [Folded - Ignored]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
  Branch (628:17): [True: 0, False: 0]
629
                "Store::inner_store() returned self when optimization present"
630
            );
631
0
            return Pin::new(inner_store)
632
0
                .update_with_whole_file(key, path, file, upload_size)
633
0
                .await;
634
1
        }
635
1
        slow_update_store_with_file(self, key, &mut file, upload_size).await
?0
;
636
1
        Ok(Some(file))
637
2
    }
638
639
    /// See: [`StoreLike::update_oneshot`] for details.
640
5.75k
    async fn update_oneshot(self: Pin<&Self>, key: StoreKey<'_>, data: Bytes) -> Result<(), Error> {
641
        // TODO(blaise.bruer) This is extremely inefficient, since we have exactly
642
        // what we need here. Maybe we could instead make a version of the stream
643
        // that can take objects already fully in memory instead?
644
5.75k
        let (mut tx, rx) = make_buf_channel_pair();
645
646
5.75k
        let data_len =
647
5.75k
            u64::try_from(data.len()).err_tip(|| 
"Could not convert data.len() to u64"0
)
?0
;
648
5.75k
        let send_fut = async move {
649
5.75k
            // Only send if we are not EOF.
650
5.75k
            if !data.is_empty() {
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 31, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [Folded - Ignored]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 1, False: 0]
  Branch (650:16): [True: 16, False: 40]
  Branch (650:16): [True: 8, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 4, False: 1]
  Branch (650:16): [True: 7, False: 0]
  Branch (650:16): [True: 90, False: 0]
  Branch (650:16): [True: 2, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 2, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 4, False: 0]
  Branch (650:16): [True: 1, False: 0]
  Branch (650:16): [True: 2, False: 0]
  Branch (650:16): [True: 2, False: 0]
  Branch (650:16): [True: 2, False: 0]
  Branch (650:16): [True: 2, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 11, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 406, False: 1]
  Branch (650:16): [Folded - Ignored]
  Branch (650:16): [True: 3, False: 2]
  Branch (650:16): [True: 2, False: 0]
  Branch (650:16): [True: 1, False: 0]
  Branch (650:16): [True: 13, False: 1]
  Branch (650:16): [True: 47, False: 30]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 1, False: 0]
  Branch (650:16): [True: 5.00k, False: 0]
  Branch (650:16): [True: 6, False: 0]
  Branch (650:16): [True: 4, False: 0]
  Branch (650:16): [True: 2, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 7, False: 0]
651
5.68k
                tx.send(data)
652
5.68k
                    .await
653
5.68k
                    .err_tip(|| 
"Failed to write data in update_oneshot"0
)
?0
;
654
75
            }
655
5.75k
            tx.send_eof()
656
5.75k
                .err_tip(|| 
"Failed to write EOF in update_oneshot"0
)
?0
;
657
5.75k
            Ok(())
658
5.75k
        };
659
5.75k
        try_join!(
660
5.75k
            send_fut,
661
5.75k
            self.update(key, rx, UploadSizeInfo::ExactSize(data_len))
662
5.75k
        )
?2
;
663
5.75k
        Ok(())
664
11.5k
    }
665
666
    /// See: [`StoreLike::get_part`] for details.
667
    async fn get_part(
668
        self: Pin<&Self>,
669
        key: StoreKey<'_>,
670
        writer: &mut DropCloserWriteHalf,
671
        offset: u64,
672
        length: Option<u64>,
673
    ) -> Result<(), Error>;
674
675
    /// See: [`StoreLike::get`] for details.
676
    #[inline]
677
    async fn get(
678
        self: Pin<&Self>,
679
        key: StoreKey<'_>,
680
        mut writer: DropCloserWriteHalf,
681
19
    ) -> Result<(), Error> {
682
19
        self.get_part(key, &mut writer, 0, None).await
683
37
    }
684
685
    /// See: [`StoreLike::get_part_unchunked`] for details.
686
    async fn get_part_unchunked(
687
        self: Pin<&Self>,
688
        key: StoreKey<'_>,
689
        offset: u64,
690
        length: Option<u64>,
691
5.35k
    ) -> Result<Bytes, Error> {
692
5.35k
        let length_usize = length
693
5.35k
            .map(|v| 
usize::try_from(v).err_tip(2.24k
||
"Could not convert length to usize"0
)2.24k
)
694
5.35k
            .transpose()
?0
;
695
696
        // TODO(blaise.bruer) This is extremely inefficient, since we have exactly
697
        // what we need here. Maybe we could instead make a version of the stream
698
        // that can take objects already fully in memory instead?
699
5.35k
        let (mut tx, mut rx) = make_buf_channel_pair();
700
701
5.35k
        let (data_res, get_part_res) = join!(
702
5.35k
            rx.consume(length_usize),
703
5.35k
            // We use a closure here to ensure that the `tx` is dropped when the
704
5.35k
            // future is done.
705
5.35k
            async move { self.get_part(key, &mut tx, offset, length).await },
706
5.35k
        );
707
5.35k
        get_part_res
708
5.35k
            .err_tip(|| 
"Failed to get_part in get_part_unchunked"11
)
709
5.35k
            .merge(data_res.err_tip(|| 
"Failed to read stream to completion in get_part_unchunked"11
))
710
10.7k
    }
711
712
    /// See: [`StoreLike::check_health`] for details.
713
0
    async fn check_health(self: Pin<&Self>, namespace: Cow<'static, str>) -> HealthStatus {
714
0
        let digest_data_size = default_digest_size_health_check();
715
0
        let mut digest_data = vec![0u8; digest_data_size];
716
0
717
0
        let mut namespace_hasher = StdHasher::new();
718
0
        namespace.hash(&mut namespace_hasher);
719
0
        self.get_name().hash(&mut namespace_hasher);
720
0
        let hash_seed = namespace_hasher.finish();
721
0
722
0
        // Fill the digest data with random data based on a stable
723
0
        // hash of the namespace and store name. Intention is to
724
0
        // have randomly filled data that is unique per store and
725
0
        // does not change between health checks. This is to ensure
726
0
        // we are not adding more data to store on each health check.
727
0
        let mut rng: StdRng = StdRng::seed_from_u64(hash_seed);
728
0
        rng.fill_bytes(&mut digest_data);
729
0
730
0
        let mut digest_hasher = default_digest_hasher_func().hasher();
731
0
        digest_hasher.update(&digest_data);
732
0
        let digest_data_len = digest_data.len() as u64;
733
0
        let digest_info = StoreKey::from(digest_hasher.finalize_digest());
734
0
735
0
        let digest_bytes = bytes::Bytes::copy_from_slice(&digest_data);
736
737
0
        if let Err(e) = self
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [Folded - Ignored]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [Folded - Ignored]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
  Branch (737:16): [True: 0, False: 0]
738
0
            .update_oneshot(digest_info.borrow(), digest_bytes.clone())
739
0
            .await
740
        {
741
0
            return HealthStatus::new_failed(
742
0
                self.get_ref(),
743
0
                format!("Store.update_oneshot() failed: {e}").into(),
744
0
            );
745
0
        }
746
0
747
0
        match self.has(digest_info.borrow()).await {
748
0
            Ok(Some(s)) => {
749
0
                if s != digest_data_len {
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [Folded - Ignored]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [Folded - Ignored]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
  Branch (749:20): [True: 0, False: 0]
750
0
                    return HealthStatus::new_failed(
751
0
                        self.get_ref(),
752
0
                        format!("Store.has() size mismatch {s} != {digest_data_len}").into(),
753
0
                    );
754
0
                }
755
            }
756
            Ok(None) => {
757
0
                return HealthStatus::new_failed(
758
0
                    self.get_ref(),
759
0
                    "Store.has() size not found".into(),
760
0
                );
761
            }
762
0
            Err(e) => {
763
0
                return HealthStatus::new_failed(
764
0
                    self.get_ref(),
765
0
                    format!("Store.has() failed: {e}").into(),
766
0
                );
767
            }
768
        }
769
770
0
        match self
771
0
            .get_part_unchunked(digest_info, 0, Some(digest_data_len))
772
0
            .await
773
        {
774
0
            Ok(b) => {
775
0
                if b != digest_bytes {
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [Folded - Ignored]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [Folded - Ignored]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
  Branch (775:20): [True: 0, False: 0]
776
0
                    return HealthStatus::new_failed(
777
0
                        self.get_ref(),
778
0
                        "Store.get_part_unchunked() data mismatch".into(),
779
0
                    );
780
0
                }
781
            }
782
0
            Err(e) => {
783
0
                return HealthStatus::new_failed(
784
0
                    self.get_ref(),
785
0
                    format!("Store.get_part_unchunked() failed: {e}").into(),
786
0
                );
787
            }
788
        }
789
790
0
        HealthStatus::new_ok(self.get_ref(), "Successfully store health check".into())
791
0
    }
792
793
    /// See: [`Store::inner_store`] for details.
794
    fn inner_store(&self, _digest: Option<StoreKey<'_>>) -> &dyn StoreDriver;
795
796
    /// Returns an Any variation of whatever Self is.
797
    fn as_any(&self) -> &(dyn std::any::Any + Sync + Send + 'static);
798
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static>;
799
800
    // Register health checks used to monitor the store.
801
0
    fn register_health(self: Arc<Self>, _registry: &mut HealthRegistryBuilder) {}
802
}
803
804
/// The instructions on how to decode a value from a Bytes & version into
805
/// the underlying type.
806
pub trait SchedulerStoreDecodeTo {
807
    type DecodeOutput;
808
    fn decode(version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error>;
809
}
810
811
pub trait SchedulerSubscription: Send + Sync {
812
    fn changed(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
813
}
814
815
pub trait SchedulerSubscriptionManager: Send + Sync {
816
    type Subscription: SchedulerSubscription;
817
818
    fn notify_for_test(&self, value: String);
819
820
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
821
    where
822
        K: SchedulerStoreKeyProvider;
823
}
824
825
/// The API surface for a scheduler store.
826
pub trait SchedulerStore: Send + Sync + 'static {
827
    type SubscriptionManager: SchedulerSubscriptionManager;
828
829
    /// Returns the subscription manager for the scheduler store.
830
    fn subscription_manager(&self) -> Result<Arc<Self::SubscriptionManager>, Error>;
831
832
    /// Updates or inserts an entry into the underlying store.
833
    /// Metadata about the key is attached to the compile-time type.
834
    /// If `StoreKeyProvider::Versioned` is `TrueValue`, the data will not
835
    /// be updated if the current version in the database does not match
836
    /// the version in the passed in data.
837
    /// No guarantees are made about when `Version` is `FalseValue`.
838
    /// Indexes are guaranteed to be updated atomically with the data.
839
    fn update_data<T>(&self, data: T) -> impl Future<Output = Result<Option<u64>, Error>> + Send
840
    where
841
        T: SchedulerStoreDataProvider
842
            + SchedulerStoreKeyProvider
843
            + SchedulerCurrentVersionProvider
844
            + Send;
845
846
    /// Searches for all keys in the store that match the given index prefix.
847
    fn search_by_index_prefix<K>(
848
        &self,
849
        index: K,
850
    ) -> impl Future<
851
        Output = Result<
852
            impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
853
            Error,
854
        >,
855
    > + Send
856
    where
857
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send;
858
859
    /// Returns data for the provided key with the given version if
860
    /// `StoreKeyProvider::Versioned` is `TrueValue`.
861
    fn get_and_decode<K>(
862
        &self,
863
        key: K,
864
    ) -> impl Future<Output = Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>> + Send
865
    where
866
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send;
867
}
868
869
/// A type that is used to let the scheduler store know what
870
/// index is being requested.
871
pub trait SchedulerIndexProvider {
872
    /// Only keys inserted with this prefix will be indexed.
873
    const KEY_PREFIX: &'static str;
874
875
    /// The name of the index.
876
    const INDEX_NAME: &'static str;
877
878
    /// The sort key for the index (if any).
879
    const MAYBE_SORT_KEY: Option<&'static str> = None;
880
881
    /// If the data is versioned.
882
    type Versioned: BoolValue;
883
884
    /// The value of the index.
885
    fn index_value(&self) -> Cow<'_, str>;
886
}
887
888
/// Provides a key to lookup data in the store.
889
pub trait SchedulerStoreKeyProvider {
890
    /// If the data is versioned.
891
    type Versioned: BoolValue;
892
893
    /// Returns the key for the data.
894
    fn get_key(&self) -> StoreKey<'static>;
895
}
896
897
/// Provides data to be stored in the scheduler store.
898
pub trait SchedulerStoreDataProvider {
899
    /// Converts the data into bytes to be stored in the store.
900
    fn try_into_bytes(self) -> Result<Bytes, Error>;
901
902
    /// Returns the indexes for the data if any.
903
1
    fn get_indexes(&self) -> Result<Vec<(&'static str, Bytes)>, Error> {
904
1
        Ok(Vec::new())
905
1
    }
906
}
907
908
/// Provides the current version of the data in the store.
909
pub trait SchedulerCurrentVersionProvider {
910
    /// Returns the current version of the data in the store.
911
    fn current_version(&self) -> u64;
912
}
913
914
/// Default implementation for when we are not providing a version
915
/// for the data.
916
impl<T> SchedulerCurrentVersionProvider for T
917
where
918
    T: SchedulerStoreKeyProvider<Versioned = FalseValue>,
919
{
920
0
    fn current_version(&self) -> u64 {
921
0
        0
922
0
    }
923
}
924
925
/// Compile time types for booleans.
926
pub trait BoolValue {
927
    const VALUE: bool;
928
}
929
/// Compile time check if something is false.
930
pub trait IsFalse {}
931
/// Compile time check if something is true.
932
pub trait IsTrue {}
933
934
/// Compile time true value.
935
pub struct TrueValue;
936
impl BoolValue for TrueValue {
937
    const VALUE: bool = true;
938
}
939
impl IsTrue for TrueValue {}
940
941
/// Compile time false value.
942
pub struct FalseValue;
943
impl BoolValue for FalseValue {
944
    const VALUE: bool = false;
945
}
946
impl IsFalse for FalseValue {}