Coverage Report

Created: 2025-05-08 18:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/store_trait.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::borrow::{Borrow, BorrowMut};
16
use core::convert::Into;
17
use core::hash::{Hash, Hasher};
18
use core::ops::{Bound, RangeBounds};
19
use core::pin::Pin;
20
use core::ptr::addr_eq;
21
use std::borrow::Cow;
22
use std::collections::hash_map::DefaultHasher as StdHasher;
23
use std::ffi::OsString;
24
use std::sync::{Arc, OnceLock};
25
26
use async_trait::async_trait;
27
use bytes::{Bytes, BytesMut};
28
use futures::{Future, FutureExt, Stream, join, try_join};
29
use nativelink_error::{Code, Error, ResultExt, error_if, make_err};
30
use nativelink_metric::MetricsComponent;
31
use rand::rngs::StdRng;
32
use rand::{RngCore, SeedableRng};
33
use serde::{Deserialize, Serialize};
34
use tokio::io::{AsyncReadExt, AsyncSeekExt};
35
36
use crate::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair};
37
use crate::common::DigestInfo;
38
use crate::digest_hasher::{DigestHasher, DigestHasherFunc, default_digest_hasher_func};
39
use crate::fs;
40
use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
41
42
static DEFAULT_DIGEST_SIZE_HEALTH_CHECK: OnceLock<usize> = OnceLock::new();
43
/// Default digest size for health check data. Any change in this value
44
/// changes the default contract. `GlobalConfig` should be updated to reflect
45
/// changes in this value.
46
pub const DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG: usize = 1024 * 1024;
47
48
// Get the default digest size for health check data, if value is unset a system wide default is used.
49
0
pub fn default_digest_size_health_check() -> usize {
50
0
    *DEFAULT_DIGEST_SIZE_HEALTH_CHECK.get_or_init(|| DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG)
51
0
}
52
53
/// Set the default digest size for health check data, this should be called once.
54
0
pub fn set_default_digest_size_health_check(size: usize) -> Result<(), Error> {
55
0
    DEFAULT_DIGEST_SIZE_HEALTH_CHECK.set(size).map_err(|_| {
56
0
        make_err!(
57
0
            Code::Internal,
58
0
            "set_default_digest_size_health_check already set"
59
0
        )
60
0
    })
61
0
}
62
63
#[derive(Debug, PartialEq, Eq, Copy, Clone, Serialize, Deserialize)]
64
pub enum UploadSizeInfo {
65
    /// When the data transfer amount is known to be exact size, this enum should be used.
66
    /// The receiver store can use this to better optimize the way the data is sent or stored.
67
    ExactSize(u64),
68
69
    /// When the data transfer amount is not known to be exact, the caller should use this enum
70
    /// to provide the maximum size that could possibly be sent. This will bypass the exact size
71
    /// checks, but still provide useful information to the underlying store about the data being
72
    /// sent that it can then use to optimize the upload process.
73
    MaxSize(u64),
74
}
75
76
/// Utility to send all the data to the store from a file.
77
// Note: This is not inlined because some code may want to bypass any underlying
78
// optimizations that may be present in the inner store.
79
8
pub async fn slow_update_store_with_file<S: StoreDriver + ?Sized>(
80
8
    store: Pin<&S>,
81
8
    digest: impl Into<StoreKey<'_>>,
82
8
    file: &mut fs::FileSlot,
83
8
    upload_size: UploadSizeInfo,
84
8
) -> Result<(), Error> {
85
8
    file.rewind()
86
8
        .await
87
8
        .err_tip(|| 
"Failed to rewind in upload_file_to_store"0
)
?0
;
88
8
    let (mut tx, rx) = make_buf_channel_pair();
89
8
90
8
    let update_fut = store
91
8
        .update(digest.into(), rx, upload_size)
92
8
        .map(|r| r.err_tip(|| 
"Could not upload data to store in upload_file_to_store"0
));
93
8
    let read_data_fut = async move {
94
        loop {
95
77
            let mut buf = BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE);
96
77
            let read = file
97
77
                .read_buf(&mut buf)
98
77
                .await
99
77
                .err_tip(|| 
"Failed to read in upload_file_to_store"0
)
?0
;
100
77
            if read == 0 {
  Branch (100:16): [True: 7, False: 5]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [Folded - Ignored]
  Branch (100:16): [True: 1, False: 64]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [Folded - Ignored]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
  Branch (100:16): [True: 0, False: 0]
101
8
                break;
102
69
            }
103
69
            tx.send(buf.freeze())
104
69
                .await
105
69
                .err_tip(|| 
"Failed to send in upload_file_to_store"0
)
?0
;
106
        }
107
8
        tx.send_eof()
108
8
            .err_tip(|| 
"Could not send EOF to store in upload_file_to_store"0
)
109
8
    };
110
8
    tokio::pin!(read_data_fut);
111
8
    let (update_res, read_res) = tokio::join!(update_fut, read_data_fut);
112
8
    update_res.merge(read_res)
113
8
}
114
115
/// Optimizations that stores may want to expose to the callers.
116
/// This is useful for specific cases when the store can optimize the processing
117
/// of the data being processed.
118
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
119
pub enum StoreOptimizations {
120
    /// The store can optimize the upload process when it knows the data is coming from a file.
121
    FileUpdates,
122
123
    /// If the store will ignore the data uploads.
124
    NoopUpdates,
125
126
    /// If the store will never serve downloads.
127
    NoopDownloads,
128
}
129
130
/// A wrapper struct for [`StoreKey`] to work around
131
/// lifetime limitations in `HashMap::get()` as described in
132
/// <https://github.com/rust-lang/rust/issues/80389>
133
///
134
/// As such this is a wrapper type that is stored in the
135
/// maps using the workaround as described in
136
/// <https://blinsay.com/blog/compound-keys/>
137
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
138
#[repr(transparent)]
139
pub struct StoreKeyBorrow(StoreKey<'static>);
140
141
impl From<StoreKey<'static>> for StoreKeyBorrow {
142
5.86k
    fn from(key: StoreKey<'static>) -> Self {
143
5.86k
        Self(key)
144
5.86k
    }
145
}
146
147
impl From<StoreKeyBorrow> for StoreKey<'static> {
148
0
    fn from(key_borrow: StoreKeyBorrow) -> Self {
149
0
        key_borrow.0
150
0
    }
151
}
152
153
impl<'a> Borrow<StoreKey<'a>> for StoreKeyBorrow {
154
4.65k
    fn borrow(&self) -> &StoreKey<'a> {
155
4.65k
        &self.0
156
4.65k
    }
157
}
158
159
impl<'a> Borrow<StoreKey<'a>> for &StoreKeyBorrow {
160
0
    fn borrow(&self) -> &StoreKey<'a> {
161
0
        &self.0
162
0
    }
163
}
164
165
/// Holds something that can be converted into a key the
166
/// store API can understand. Generally this is a digest
167
/// but it can also be a string if the caller wishes to
168
/// store the data directly and reference it by a string
169
/// directly.
170
#[derive(Debug, Eq)]
171
pub enum StoreKey<'a> {
172
    /// A string key.
173
    Str(Cow<'a, str>),
174
175
    /// A key that is a digest.
176
    Digest(DigestInfo),
177
}
178
179
impl<'a> StoreKey<'a> {
180
    /// Creates a new store key from a string.
181
22
    pub const fn new_str(s: &'a str) -> Self {
182
22
        StoreKey::Str(Cow::Borrowed(s))
183
22
    }
184
185
    /// Returns a shallow clone of the key.
186
    /// This is extremely cheap and should be used when clone
187
    /// is needed but the key is not going to be modified.
188
    #[must_use]
189
    #[allow(
190
        clippy::missing_const_for_fn,
191
        reason = "False positive on stable, but not on nightly"
192
    )]
193
6.83k
    pub fn borrow(&'a self) -> Self {
194
60
        match self {
195
22
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Borrowed(s)),
196
38
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Borrowed(s)),
197
6.77k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
198
        }
199
6.83k
    }
200
201
    /// Converts the key into an owned version. This is useful
202
    /// when the caller needs an owned version of the key.
203
7.27k
    pub fn into_owned(self) -> StoreKey<'static> {
204
60
        match self {
205
7
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Owned(s)),
206
53
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Owned(s.to_owned())),
207
7.21k
            StoreKey::Digest(d) => StoreKey::Digest(d),
208
        }
209
7.27k
    }
210
211
    /// Converts the key into a digest. This is useful when the caller
212
    /// must have a digest key. If the data is not a digest, it may
213
    /// hash the underlying key and return a digest of the hash of the key
214
147
    pub fn into_digest(self) -> DigestInfo {
215
147
        match self {
216
145
            StoreKey::Digest(digest) => digest,
217
2
            StoreKey::Str(s) => {
218
2
                let mut hasher = DigestHasherFunc::Blake3.hasher();
219
2
                hasher.update(s.as_bytes());
220
2
                hasher.finalize_digest()
221
            }
222
        }
223
147
    }
224
225
    /// Returns the key as a string. If the key is a digest, it will
226
    /// return a string representation of the digest. If the key is a string,
227
    /// it will return the string itself.
228
65
    pub fn as_str(&'a self) -> Cow<'a, str> {
229
17
        match self {
230
17
            StoreKey::Str(Cow::Owned(s)) => Cow::Borrowed(s),
231
0
            StoreKey::Str(Cow::Borrowed(s)) => Cow::Borrowed(s),
232
48
            StoreKey::Digest(d) => Cow::Owned(format!("{d}")),
233
        }
234
65
    }
235
}
236
237
impl Clone for StoreKey<'static> {
238
5.87k
    fn clone(&self) -> Self {
239
5.87k
        match self {
240
19
            StoreKey::Str(s) => StoreKey::Str(s.clone()),
241
5.85k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
242
        }
243
5.87k
    }
244
}
245
246
impl PartialOrd for StoreKey<'_> {
247
36
    fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
248
36
        Some(self.cmp(other))
249
36
    }
250
}
251
252
impl Ord for StoreKey<'_> {
253
53
    fn cmp(&self, other: &Self) -> core::cmp::Ordering {
254
53
        match (self, other) {
255
53
            (StoreKey::Str(a), StoreKey::Str(b)) => a.cmp(b),
256
0
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a.cmp(b),
257
0
            (StoreKey::Str(_), StoreKey::Digest(_)) => core::cmp::Ordering::Less,
258
0
            (StoreKey::Digest(_), StoreKey::Str(_)) => core::cmp::Ordering::Greater,
259
        }
260
53
    }
261
}
262
263
impl PartialEq for StoreKey<'_> {
264
5.56k
    fn eq(&self, other: &Self) -> bool {
265
5.56k
        match (self, other) {
266
50
            (StoreKey::Str(a), StoreKey::Str(b)) => a == b,
267
5.51k
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a == b,
268
0
            _ => false,
269
        }
270
5.56k
    }
271
}
272
273
impl Hash for StoreKey<'_> {
274
25.2k
    fn hash<H: Hasher>(&self, state: &mut H) {
275
        /// Salts the hash with the enum value that represents
276
        /// the type of the key.
277
        #[repr(u8)]
278
        enum HashId {
279
            Str = 0,
280
            Digest = 1,
281
        }
282
25.2k
        match self {
283
42
            StoreKey::Str(s) => {
284
42
                (HashId::Str as u8).hash(state);
285
42
                s.hash(state);
286
42
            }
287
25.1k
            StoreKey::Digest(d) => {
288
25.1k
                (HashId::Digest as u8).hash(state);
289
25.1k
                d.hash(state);
290
25.1k
            }
291
        }
292
25.2k
    }
293
}
294
295
impl<'a> From<&'a str> for StoreKey<'a> {
296
0
    fn from(s: &'a str) -> Self {
297
0
        StoreKey::Str(Cow::Borrowed(s))
298
0
    }
299
}
300
301
impl From<String> for StoreKey<'static> {
302
0
    fn from(s: String) -> Self {
303
0
        StoreKey::Str(Cow::Owned(s))
304
0
    }
305
}
306
307
impl From<DigestInfo> for StoreKey<'_> {
308
10.4k
    fn from(d: DigestInfo) -> Self {
309
10.4k
        StoreKey::Digest(d)
310
10.4k
    }
311
}
312
313
impl From<&DigestInfo> for StoreKey<'_> {
314
39
    fn from(d: &DigestInfo) -> Self {
315
39
        StoreKey::Digest(*d)
316
39
    }
317
}
318
319
#[derive(Clone, MetricsComponent)]
320
#[repr(transparent)]
321
pub struct Store {
322
    #[metric]
323
    inner: Arc<dyn StoreDriver>,
324
}
325
326
impl core::fmt::Debug for Store {
327
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
328
0
        f.debug_struct("Store").finish_non_exhaustive()
329
0
    }
330
}
331
332
impl Store {
333
269
    pub fn new(inner: Arc<dyn StoreDriver>) -> Self {
334
269
        Self { inner }
335
269
    }
336
}
337
338
impl Store {
339
    /// Returns the immediate inner store driver.
340
    /// Note: This does not recursively try to resolve underlying store drivers
341
    /// like `.inner_store()` does.
342
    #[inline]
343
1
    pub fn into_inner(self) -> Arc<dyn StoreDriver> {
344
1
        self.inner
345
1
    }
346
347
    /// Gets the underlying store for the given digest.
348
    /// A caller might want to use this to obtain a reference to the "real" underlying store
349
    /// (if applicable) and check if it implements some special traits that allow optimizations.
350
    /// Note: If the store performs complex operations on the data, it should return itself.
351
    #[inline]
352
164
    pub fn inner_store<'a, K: Into<StoreKey<'a>>>(&self, digest: Option<K>) -> &dyn StoreDriver {
353
164
        self.inner.inner_store(digest.map(Into::into))
354
164
    }
355
356
    /// Tries to cast the underlying store to the given type.
357
    #[inline]
358
65
    pub fn downcast_ref<U: StoreDriver>(&self, maybe_digest: Option<StoreKey<'_>>) -> Option<&U> {
359
65
        self.inner.inner_store(maybe_digest).as_any().downcast_ref()
360
65
    }
361
362
    /// Register health checks used to monitor the store.
363
    #[inline]
364
0
    pub fn register_health(&self, registry: &mut HealthRegistryBuilder) {
365
0
        self.inner.clone().register_health(registry);
366
0
    }
367
}
368
369
impl StoreLike for Store {
370
    #[inline]
371
10.0k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
372
10.0k
        self.inner.as_ref()
373
10.0k
    }
374
375
7
    fn as_pin(&self) -> Pin<&Self> {
376
7
        Pin::new(self)
377
7
    }
378
}
379
380
impl<T> StoreLike for T
381
where
382
    T: StoreDriver + Sized,
383
{
384
    #[inline]
385
7.99k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
386
7.99k
        self
387
7.99k
    }
388
389
84
    fn as_pin(&self) -> Pin<&Self> {
390
84
        Pin::new(self)
391
84
    }
392
}
393
394
pub trait StoreLike: Send + Sync + Sized + Unpin + 'static {
395
    /// Returns the immediate inner store driver.
396
    fn as_store_driver(&self) -> &'_ dyn StoreDriver;
397
398
    /// Utility function to return a pinned reference to self.
399
    fn as_pin(&self) -> Pin<&Self>;
400
401
    /// Utility function to return a pinned reference to the store driver.
402
    #[inline]
403
18.0k
    fn as_store_driver_pin(&self) -> Pin<&'_ dyn StoreDriver> {
404
18.0k
        Pin::new(self.as_store_driver())
405
18.0k
    }
406
407
    /// Look up a digest in the store and return None if it does not exist in
408
    /// the store, or Some(size) if it does.
409
    /// Note: On an AC store the size will be incorrect and should not be used!
410
    #[inline]
411
228
    fn has<'a>(
412
228
        &'a self,
413
228
        digest: impl Into<StoreKey<'a>>,
414
228
    ) -> impl Future<Output = Result<Option<u64>, Error>> + 'a {
415
228
        self.as_store_driver_pin().has(digest.into())
416
228
    }
417
418
    /// Look up a list of digests in the store and return a result for each in
419
    /// the same order as input.  The result will either be None if it does not
420
    /// exist in the store, or Some(size) if it does.
421
    /// Note: On an AC store the size will be incorrect and should not be used!
422
    #[inline]
423
19
    fn has_many<'a>(
424
19
        &'a self,
425
19
        digests: &'a [StoreKey<'a>],
426
19
    ) -> impl Future<Output = Result<Vec<Option<u64>>, Error>> + Send + 'a {
427
19
        self.as_store_driver_pin().has_many(digests)
428
19
    }
429
430
    /// The implementation of the above has and `has_many` functions.  See their
431
    /// documentation for details.
432
    #[inline]
433
42
    fn has_with_results<'a>(
434
42
        &'a self,
435
42
        digests: &'a [StoreKey<'a>],
436
42
        results: &'a mut [Option<u64>],
437
42
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
438
42
        self.as_store_driver_pin()
439
42
            .has_with_results(digests, results)
440
42
    }
441
442
    /// List all the keys in the store that are within the given range.
443
    /// `handler` is called for each key in the range. If `handler` returns
444
    /// false, the listing is stopped.
445
    ///
446
    /// The number of keys passed through the handler is the return value.
447
    #[inline]
448
15
    fn list<'a, 'b>(
449
15
        &'a self,
450
15
        range: impl RangeBounds<StoreKey<'b>> + Send + 'b,
451
15
        mut handler: impl for<'c> FnMut(&'c StoreKey) -> bool + Send + Sync + 'a,
452
15
    ) -> impl Future<Output = Result<u64, Error>> + Send + 'a
453
15
    where
454
15
        'b: 'a,
455
15
    {
456
        // Note: We use a manual async move, so the future can own the `range` and `handler`,
457
        // otherwise we'd require the caller to pass them in by reference making more borrow
458
        // checker noise.
459
15
        async move {
460
15
            self.as_store_driver_pin()
461
15
                .list(
462
15
                    (
463
15
                        range.start_bound().map(StoreKey::borrow),
464
15
                        range.end_bound().map(StoreKey::borrow),
465
15
                    ),
466
15
                    &mut handler,
467
15
                )
468
15
                .await
469
15
        }
470
15
    }
471
472
    /// Sends the data to the store.
473
    #[inline]
474
5.21k
    fn update<'a>(
475
5.21k
        &'a self,
476
5.21k
        digest: impl Into<StoreKey<'a>>,
477
5.21k
        reader: DropCloserReadHalf,
478
5.21k
        upload_size: UploadSizeInfo,
479
5.21k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
480
5.21k
        self.as_store_driver_pin()
481
5.21k
            .update(digest.into(), reader, upload_size)
482
5.21k
    }
483
484
    /// Any optimizations the store might want to expose to the callers.
485
    /// By default, no optimizations are exposed.
486
    #[inline]
487
14
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
488
14
        self.as_store_driver_pin().optimized_for(optimization)
489
14
    }
490
491
    /// Specialized version of `.update()` which takes a `FileSlot`.
492
    /// This is useful if the underlying store can optimize the upload process
493
    /// when it knows the data is coming from a file.
494
    #[inline]
495
10
    fn update_with_whole_file<'a>(
496
10
        &'a self,
497
10
        digest: impl Into<StoreKey<'a>>,
498
10
        path: OsString,
499
10
        file: fs::FileSlot,
500
10
        upload_size: UploadSizeInfo,
501
10
    ) -> impl Future<Output = Result<Option<fs::FileSlot>, Error>> + Send + 'a {
502
10
        self.as_store_driver_pin()
503
10
            .update_with_whole_file(digest.into(), path, file, upload_size)
504
10
    }
505
506
    /// Utility to send all the data to the store when you have all the bytes.
507
    #[inline]
508
5.66k
    fn update_oneshot<'a>(
509
5.66k
        &'a self,
510
5.66k
        digest: impl Into<StoreKey<'a>>,
511
5.66k
        data: Bytes,
512
5.66k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
513
5.66k
        self.as_store_driver_pin()
514
5.66k
            .update_oneshot(digest.into(), data)
515
5.66k
    }
516
517
    /// Retrieves part of the data from the store and writes it to the given writer.
518
    #[inline]
519
1.30k
    fn get_part<'a>(
520
1.30k
        &'a self,
521
1.30k
        digest: impl Into<StoreKey<'a>>,
522
1.30k
        mut writer: impl BorrowMut<DropCloserWriteHalf> + Send + 'a,
523
1.30k
        offset: u64,
524
1.30k
        length: Option<u64>,
525
1.30k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
526
1.30k
        let key = digest.into();
527
        // Note: We need to capture `writer` just in case the caller
528
        // expects the drop() method to be called on it when the future
529
        // is done due to the complex interaction between the DropCloserWriteHalf
530
        // and the DropCloserReadHalf during drop().
531
1.30k
        async move {
532
1.30k
            self.as_store_driver_pin()
533
1.30k
                .get_part(key, writer.borrow_mut(), offset, length)
534
1.30k
                .await
535
1.30k
        }
536
1.30k
    }
537
538
    /// Utility that works the same as `.get_part()`, but writes all the data.
539
    #[inline]
540
15
    fn get<'a>(
541
15
        &'a self,
542
15
        key: impl Into<StoreKey<'a>>,
543
15
        writer: DropCloserWriteHalf,
544
15
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
545
15
        self.as_store_driver_pin().get(key.into(), writer)
546
15
    }
547
548
    /// Utility that will return all the bytes at once instead of in a streaming manner.
549
    #[inline]
550
5.25k
    fn get_part_unchunked<'a>(
551
5.25k
        &'a self,
552
5.25k
        key: impl Into<StoreKey<'a>>,
553
5.25k
        offset: u64,
554
5.25k
        length: Option<u64>,
555
5.25k
    ) -> impl Future<Output = Result<Bytes, Error>> + Send + 'a {
556
5.25k
        self.as_store_driver_pin()
557
5.25k
            .get_part_unchunked(key.into(), offset, length)
558
5.25k
    }
559
560
    /// Default implementation of the health check. Some stores may want to override this
561
    /// in situations where the default implementation is not sufficient.
562
    #[inline]
563
0
    fn check_health(
564
0
        &self,
565
0
        namespace: Cow<'static, str>,
566
0
    ) -> impl Future<Output = HealthStatus> + Send {
567
0
        self.as_store_driver_pin().check_health(namespace)
568
0
    }
569
}
570
571
#[async_trait]
572
pub trait StoreDriver:
573
    Sync + Send + Unpin + MetricsComponent + HealthStatusIndicator + 'static
574
{
575
    /// See: [`StoreLike::has`] for details.
576
    #[inline]
577
488
    async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<u64>, Error> {
578
244
        let mut result = [None];
579
244
        self.has_with_results(&[key], &mut result).await
?0
;
580
244
        Ok(result[0])
581
488
    }
582
583
    /// See: [`StoreLike::has_many`] for details.
584
    #[inline]
585
    async fn has_many(
586
        self: Pin<&Self>,
587
        digests: &[StoreKey<'_>],
588
38
    ) -> Result<Vec<Option<u64>>, Error> {
589
19
        let mut results = vec![None; digests.len()];
590
19
        self.has_with_results(digests, &mut results).await
?0
;
591
19
        Ok(results)
592
38
    }
593
594
    /// See: [`StoreLike::has_with_results`] for details.
595
    async fn has_with_results(
596
        self: Pin<&Self>,
597
        digests: &[StoreKey<'_>],
598
        results: &mut [Option<u64>],
599
    ) -> Result<(), Error>;
600
601
    /// See: [`StoreLike::list`] for details.
602
    async fn list(
603
        self: Pin<&Self>,
604
        _range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
605
        _handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
606
0
    ) -> Result<u64, Error> {
607
        // TODO(aaronmondal) We should force all stores to implement this function instead of
608
        // providing a default implementation.
609
0
        Err(make_err!(
610
0
            Code::Unimplemented,
611
0
            "Store::list() not implemented for this store"
612
0
        ))
613
0
    }
614
615
    /// See: [`StoreLike::update`] for details.
616
    async fn update(
617
        self: Pin<&Self>,
618
        key: StoreKey<'_>,
619
        reader: DropCloserReadHalf,
620
        upload_size: UploadSizeInfo,
621
    ) -> Result<(), Error>;
622
623
    /// See: [`StoreLike::optimized_for`] for details.
624
90
    fn optimized_for(&self, _optimization: StoreOptimizations) -> bool {
625
90
        false
626
90
    }
627
628
    /// See: [`StoreLike::update_with_whole_file`] for details.
629
    async fn update_with_whole_file(
630
        self: Pin<&Self>,
631
        key: StoreKey<'_>,
632
        path: OsString,
633
        mut file: fs::FileSlot,
634
        upload_size: UploadSizeInfo,
635
2
    ) -> Result<Option<fs::FileSlot>, Error> {
636
1
        let inner_store = self.inner_store(Some(key.borrow()));
637
1
        if inner_store.optimized_for(StoreOptimizations::FileUpdates) {
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [Folded - Ignored]
  Branch (637:12): [True: 0, False: 1]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [Folded - Ignored]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
  Branch (637:12): [True: 0, False: 0]
638
0
            error_if!(
639
0
                addr_eq(inner_store, &*self),
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [Folded - Ignored]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [Folded - Ignored]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
  Branch (639:17): [True: 0, False: 0]
640
                "Store::inner_store() returned self when optimization present"
641
            );
642
0
            return Pin::new(inner_store)
643
0
                .update_with_whole_file(key, path, file, upload_size)
644
0
                .await;
645
1
        }
646
1
        slow_update_store_with_file(self, key, &mut file, upload_size).await
?0
;
647
1
        Ok(Some(file))
648
2
    }
649
650
    /// See: [`StoreLike::update_oneshot`] for details.
651
11.5k
    async fn update_oneshot(self: Pin<&Self>, key: StoreKey<'_>, data: Bytes) -> Result<(), Error> {
652
        // TODO(aaronmondal) This is extremely inefficient, since we have exactly
653
        // what we need here. Maybe we could instead make a version of the stream
654
        // that can take objects already fully in memory instead?
655
5.75k
        let (mut tx, rx) = make_buf_channel_pair();
656
657
5.75k
        let data_len =
658
5.75k
            u64::try_from(data.len()).err_tip(|| 
"Could not convert data.len() to u64"0
)
?0
;
659
5.75k
        let send_fut = async move {
660
5.75k
            // Only send if we are not EOF.
661
5.75k
            if !data.is_empty() {
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 32, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [Folded - Ignored]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 1, False: 0]
  Branch (661:16): [True: 16, False: 40]
  Branch (661:16): [True: 8, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 4, False: 1]
  Branch (661:16): [True: 7, False: 0]
  Branch (661:16): [True: 90, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 2, False: 0]
  Branch (661:16): [True: 2, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 4, False: 0]
  Branch (661:16): [True: 1, False: 0]
  Branch (661:16): [True: 2, False: 0]
  Branch (661:16): [True: 2, False: 0]
  Branch (661:16): [True: 2, False: 0]
  Branch (661:16): [True: 2, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 11, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 406, False: 1]
  Branch (661:16): [Folded - Ignored]
  Branch (661:16): [True: 3, False: 2]
  Branch (661:16): [True: 2, False: 0]
  Branch (661:16): [True: 1, False: 0]
  Branch (661:16): [True: 13, False: 1]
  Branch (661:16): [True: 47, False: 30]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 1, False: 0]
  Branch (661:16): [True: 5.00k, False: 0]
  Branch (661:16): [True: 6, False: 0]
  Branch (661:16): [True: 4, False: 0]
  Branch (661:16): [True: 2, False: 0]
  Branch (661:16): [True: 0, False: 0]
  Branch (661:16): [True: 7, False: 0]
662
5.68k
                tx.send(data)
663
5.68k
                    .await
664
5.68k
                    .err_tip(|| 
"Failed to write data in update_oneshot"0
)
?0
;
665
75
            }
666
5.75k
            tx.send_eof()
667
5.75k
                .err_tip(|| 
"Failed to write EOF in update_oneshot"0
)
?0
;
668
5.75k
            Ok(())
669
5.75k
        };
670
5.75k
        try_join!(
671
5.75k
            send_fut,
672
5.75k
            self.update(key, rx, UploadSizeInfo::ExactSize(data_len))
673
2
        )?;
674
5.75k
        Ok(())
675
11.5k
    }
676
677
    /// See: [`StoreLike::get_part`] for details.
678
    async fn get_part(
679
        self: Pin<&Self>,
680
        key: StoreKey<'_>,
681
        writer: &mut DropCloserWriteHalf,
682
        offset: u64,
683
        length: Option<u64>,
684
    ) -> Result<(), Error>;
685
686
    /// See: [`StoreLike::get`] for details.
687
    #[inline]
688
    async fn get(
689
        self: Pin<&Self>,
690
        key: StoreKey<'_>,
691
        mut writer: DropCloserWriteHalf,
692
38
    ) -> Result<(), Error> {
693
19
        self.get_part(key, &mut writer, 0, None).await
694
37
    }
695
696
    /// See: [`StoreLike::get_part_unchunked`] for details.
697
    async fn get_part_unchunked(
698
        self: Pin<&Self>,
699
        key: StoreKey<'_>,
700
        offset: u64,
701
        length: Option<u64>,
702
10.7k
    ) -> Result<Bytes, Error> {
703
5.35k
        let length_usize = length
704
5.35k
            .map(|v| 
usize::try_from(v).err_tip2.24k
(||
"Could not convert length to usize"0
))
705
5.35k
            .transpose()
?0
;
706
707
        // TODO(aaronmondal) This is extremely inefficient, since we have exactly
708
        // what we need here. Maybe we could instead make a version of the stream
709
        // that can take objects already fully in memory instead?
710
5.35k
        let (mut tx, mut rx) = make_buf_channel_pair();
711
712
5.35k
        let (data_res, get_part_res) = join!(
713
5.35k
            rx.consume(length_usize),
714
            // We use a closure here to ensure that the `tx` is dropped when the
715
            // future is done.
716
5.35k
            async move { self.get_part(key, &mut tx, offset, length).await },
717
        );
718
5.35k
        get_part_res
719
5.35k
            .err_tip(|| 
"Failed to get_part in get_part_unchunked"11
)
720
5.35k
            .merge(data_res.err_tip(|| 
"Failed to read stream to completion in get_part_unchunked"11
))
721
10.7k
    }
722
723
    /// See: [`StoreLike::check_health`] for details.
724
0
    async fn check_health(self: Pin<&Self>, namespace: Cow<'static, str>) -> HealthStatus {
725
0
        let digest_data_size = default_digest_size_health_check();
726
0
        let mut digest_data = vec![0u8; digest_data_size];
727
0
728
0
        let mut namespace_hasher = StdHasher::new();
729
0
        namespace.hash(&mut namespace_hasher);
730
0
        self.get_name().hash(&mut namespace_hasher);
731
0
        let hash_seed = namespace_hasher.finish();
732
0
733
0
        // Fill the digest data with random data based on a stable
734
0
        // hash of the namespace and store name. Intention is to
735
0
        // have randomly filled data that is unique per store and
736
0
        // does not change between health checks. This is to ensure
737
0
        // we are not adding more data to store on each health check.
738
0
        let mut rng: StdRng = StdRng::seed_from_u64(hash_seed);
739
0
        rng.fill_bytes(&mut digest_data);
740
0
741
0
        let mut digest_hasher = default_digest_hasher_func().hasher();
742
0
        digest_hasher.update(&digest_data);
743
0
        let digest_data_len = digest_data.len() as u64;
744
0
        let digest_info = StoreKey::from(digest_hasher.finalize_digest());
745
0
746
0
        let digest_bytes = Bytes::copy_from_slice(&digest_data);
747
748
0
        if let Err(e) = self
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [Folded - Ignored]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [Folded - Ignored]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
  Branch (748:16): [True: 0, False: 0]
749
0
            .update_oneshot(digest_info.borrow(), digest_bytes.clone())
750
0
            .await
751
        {
752
0
            return HealthStatus::new_failed(
753
0
                self.get_ref(),
754
0
                format!("Store.update_oneshot() failed: {e}").into(),
755
            );
756
0
        }
757
0
758
0
        match self.has(digest_info.borrow()).await {
759
0
            Ok(Some(s)) => {
760
0
                if s != digest_data_len {
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [Folded - Ignored]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [Folded - Ignored]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
  Branch (760:20): [True: 0, False: 0]
761
0
                    return HealthStatus::new_failed(
762
0
                        self.get_ref(),
763
0
                        format!("Store.has() size mismatch {s} != {digest_data_len}").into(),
764
                    );
765
0
                }
766
            }
767
            Ok(None) => {
768
0
                return HealthStatus::new_failed(
769
0
                    self.get_ref(),
770
0
                    "Store.has() size not found".into(),
771
                );
772
            }
773
0
            Err(e) => {
774
0
                return HealthStatus::new_failed(
775
0
                    self.get_ref(),
776
0
                    format!("Store.has() failed: {e}").into(),
777
                );
778
            }
779
        }
780
781
0
        match self
782
0
            .get_part_unchunked(digest_info, 0, Some(digest_data_len))
783
0
            .await
784
        {
785
0
            Ok(b) => {
786
0
                if b != digest_bytes {
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [Folded - Ignored]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [Folded - Ignored]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 0]
787
0
                    return HealthStatus::new_failed(
788
0
                        self.get_ref(),
789
0
                        "Store.get_part_unchunked() data mismatch".into(),
790
                    );
791
0
                }
792
            }
793
0
            Err(e) => {
794
0
                return HealthStatus::new_failed(
795
0
                    self.get_ref(),
796
0
                    format!("Store.get_part_unchunked() failed: {e}").into(),
797
                );
798
            }
799
        }
800
801
0
        HealthStatus::new_ok(self.get_ref(), "Successfully store health check".into())
802
0
    }
803
804
    /// See: [`Store::inner_store`] for details.
805
    fn inner_store(&self, _digest: Option<StoreKey<'_>>) -> &dyn StoreDriver;
806
807
    /// Returns an Any variation of whatever Self is.
808
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static);
809
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static>;
810
811
    // Register health checks used to monitor the store.
812
0
    fn register_health(self: Arc<Self>, _registry: &mut HealthRegistryBuilder) {}
813
}
814
815
/// The instructions on how to decode a value from a Bytes & version into
816
/// the underlying type.
817
pub trait SchedulerStoreDecodeTo {
818
    type DecodeOutput;
819
    fn decode(version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error>;
820
}
821
822
pub trait SchedulerSubscription: Send + Sync {
823
    fn changed(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
824
}
825
826
pub trait SchedulerSubscriptionManager: Send + Sync {
827
    type Subscription: SchedulerSubscription;
828
829
    fn notify_for_test(&self, value: String);
830
831
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
832
    where
833
        K: SchedulerStoreKeyProvider;
834
}
835
836
/// The API surface for a scheduler store.
837
pub trait SchedulerStore: Send + Sync + 'static {
838
    type SubscriptionManager: SchedulerSubscriptionManager;
839
840
    /// Returns the subscription manager for the scheduler store.
841
    fn subscription_manager(&self) -> Result<Arc<Self::SubscriptionManager>, Error>;
842
843
    /// Updates or inserts an entry into the underlying store.
844
    /// Metadata about the key is attached to the compile-time type.
845
    /// If `StoreKeyProvider::Versioned` is `TrueValue`, the data will not
846
    /// be updated if the current version in the database does not match
847
    /// the version in the passed in data.
848
    /// No guarantees are made about when `Version` is `FalseValue`.
849
    /// Indexes are guaranteed to be updated atomically with the data.
850
    fn update_data<T>(&self, data: T) -> impl Future<Output = Result<Option<u64>, Error>> + Send
851
    where
852
        T: SchedulerStoreDataProvider
853
            + SchedulerStoreKeyProvider
854
            + SchedulerCurrentVersionProvider
855
            + Send;
856
857
    /// Searches for all keys in the store that match the given index prefix.
858
    fn search_by_index_prefix<K>(
859
        &self,
860
        index: K,
861
    ) -> impl Future<
862
        Output = Result<
863
            impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
864
            Error,
865
        >,
866
    > + Send
867
    where
868
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send;
869
870
    /// Returns data for the provided key with the given version if
871
    /// `StoreKeyProvider::Versioned` is `TrueValue`.
872
    fn get_and_decode<K>(
873
        &self,
874
        key: K,
875
    ) -> impl Future<Output = Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>> + Send
876
    where
877
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send;
878
}
879
880
/// A type that is used to let the scheduler store know what
881
/// index is being requested.
882
pub trait SchedulerIndexProvider {
883
    /// Only keys inserted with this prefix will be indexed.
884
    const KEY_PREFIX: &'static str;
885
886
    /// The name of the index.
887
    const INDEX_NAME: &'static str;
888
889
    /// The sort key for the index (if any).
890
    const MAYBE_SORT_KEY: Option<&'static str> = None;
891
892
    /// If the data is versioned.
893
    type Versioned: BoolValue;
894
895
    /// The value of the index.
896
    fn index_value(&self) -> Cow<'_, str>;
897
}
898
899
/// Provides a key to lookup data in the store.
900
pub trait SchedulerStoreKeyProvider {
901
    /// If the data is versioned.
902
    type Versioned: BoolValue;
903
904
    /// Returns the key for the data.
905
    fn get_key(&self) -> StoreKey<'static>;
906
}
907
908
/// Provides data to be stored in the scheduler store.
909
pub trait SchedulerStoreDataProvider {
910
    /// Converts the data into bytes to be stored in the store.
911
    fn try_into_bytes(self) -> Result<Bytes, Error>;
912
913
    /// Returns the indexes for the data if any.
914
1
    fn get_indexes(&self) -> Result<Vec<(&'static str, Bytes)>, Error> {
915
1
        Ok(Vec::new())
916
1
    }
917
}
918
919
/// Provides the current version of the data in the store.
920
pub trait SchedulerCurrentVersionProvider {
921
    /// Returns the current version of the data in the store.
922
    fn current_version(&self) -> u64;
923
}
924
925
/// Default implementation for when we are not providing a version
926
/// for the data.
927
impl<T> SchedulerCurrentVersionProvider for T
928
where
929
    T: SchedulerStoreKeyProvider<Versioned = FalseValue>,
930
{
931
0
    fn current_version(&self) -> u64 {
932
0
        0
933
0
    }
934
}
935
936
/// Compile time types for booleans.
937
pub trait BoolValue {
938
    const VALUE: bool;
939
}
940
/// Compile time check if something is false.
941
pub trait IsFalse {}
942
/// Compile time check if something is true.
943
pub trait IsTrue {}
944
945
/// Compile time true value.
946
#[derive(Debug, Clone, Copy)]
947
pub struct TrueValue;
948
impl BoolValue for TrueValue {
949
    const VALUE: bool = true;
950
}
951
impl IsTrue for TrueValue {}
952
953
/// Compile time false value.
954
#[derive(Debug, Clone, Copy)]
955
pub struct FalseValue;
956
impl BoolValue for FalseValue {
957
    const VALUE: bool = false;
958
}
959
impl IsFalse for FalseValue {}