Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-util/src/store_trait.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::{BorrowMut, Cow};
16
use std::collections::hash_map::DefaultHasher as StdHasher;
17
use std::convert::Into;
18
use std::hash::{Hash, Hasher};
19
use std::ops::{Bound, RangeBounds};
20
use std::pin::Pin;
21
use std::ptr::addr_eq;
22
use std::sync::{Arc, OnceLock};
23
24
use async_trait::async_trait;
25
use bytes::{Bytes, BytesMut};
26
use futures::future::{select, Either};
27
use futures::{join, try_join, Future, FutureExt, Stream};
28
use nativelink_error::{error_if, make_err, Code, Error, ResultExt};
29
use nativelink_metric::MetricsComponent;
30
use rand::rngs::StdRng;
31
use rand::{RngCore, SeedableRng};
32
use serde::{Deserialize, Serialize};
33
use tokio::io::AsyncSeekExt;
34
use tokio::time::timeout;
35
36
use crate::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
37
use crate::common::DigestInfo;
38
use crate::digest_hasher::{default_digest_hasher_func, DigestHasher, DigestHasherFunc};
39
use crate::fs::{self, idle_file_descriptor_timeout};
40
use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
41
42
static DEFAULT_DIGEST_SIZE_HEALTH_CHECK: OnceLock<usize> = OnceLock::new();
43
/// Default digest size for health check data. Any change in this value
44
/// changes the default contract. `GlobalConfig` should be updated to reflect
45
/// changes in this value.
46
pub const DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG: usize = 1024 * 1024;
47
48
// Get the default digest size for health check data, if value is unset a system wide default is used.
49
0
pub fn default_digest_size_health_check() -> usize {
50
0
    *DEFAULT_DIGEST_SIZE_HEALTH_CHECK.get_or_init(|| DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG)
51
0
}
52
53
/// Set the default digest size for health check data, this should be called once.
54
0
pub fn set_default_digest_size_health_check(size: usize) -> Result<(), Error> {
55
0
    DEFAULT_DIGEST_SIZE_HEALTH_CHECK.set(size).map_err(|_| {
56
0
        make_err!(
57
0
            Code::Internal,
58
0
            "set_default_digest_size_health_check already set"
59
0
        )
60
0
    })
61
0
}
62
63
2.45k
#[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize)]
64
pub enum UploadSizeInfo {
65
    /// When the data transfer amount is known to be exact size, this enum should be used.
66
    /// The receiver store can use this to better optimize the way the data is sent or stored.
67
    ExactSize(u64),
68
69
    /// When the data transfer amount is not known to be exact, the caller should use this enum
70
    /// to provide the maximum size that could possibly be sent. This will bypass the exact size
71
    /// checks, but still provide useful information to the underlying store about the data being
72
    /// sent that it can then use to optimize the upload process.
73
    MaxSize(u64),
74
}
75
76
/// Utility to send all the data to the store from a file.
77
// Note: This is not inlined because some code may want to bypass any underlying
78
// optimizations that may be present in the inner store.
79
4
pub async fn slow_update_store_with_file<S: StoreDriver + ?Sized>(
80
4
    store: Pin<&S>,
81
4
    digest: impl Into<StoreKey<'_>>,
82
4
    file: &mut fs::ResumeableFileSlot,
83
4
    upload_size: UploadSizeInfo,
84
4
) -> Result<(), Error> {
85
4
    file.as_writer()
86
0
        .await
87
4
        .err_tip(|| 
"Failed to get writer in upload_file_to_store"0
)
?0
88
4
        .rewind()
89
4
        .await
90
4
        .err_tip(|| 
"Failed to rewind in upload_file_to_store"0
)
?0
;
91
4
    let (tx, rx) = make_buf_channel_pair();
92
4
93
4
    let mut update_fut = store
94
4
        .update(digest.into(), rx, upload_size)
95
4
        .map(|r| r.err_tip(|| 
"Could not upload data to store in upload_file_to_store"0
));
96
4
    let read_result = {
97
4
        let read_data_fut = async {
98
4
            let (_, mut tx) = file
99
4
                .read_buf_cb(
100
4
                    (BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE), tx),
101
67
                    move |(chunk, mut tx)| async move {
102
67
                        tx.send(chunk.freeze())
103
0
                            .await
104
67
                            .err_tip(|| 
"Failed to send in upload_file_to_store"0
)
?0
;
105
67
                        Ok((BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE), tx))
106
134
                    },
107
4
                )
108
74
                .await
109
4
                .err_tip(|| 
"Error in upload_file_to_store::read_buf_cb section"0
)
?0
;
110
4
            tx.send_eof()
111
4
                .err_tip(|| 
"Could not send EOF to store in upload_file_to_store"0
)
?0
;
112
4
            Ok(())
113
4
        };
114
4
        tokio::pin!(read_data_fut);
115
74
        match select(&mut update_fut, read_data_fut).await {
116
0
            Either::Left((update_result, read_data_fut)) => {
117
0
                return update_result.merge(read_data_fut.await)
118
            }
119
4
            Either::Right((read_result, _)) => read_result,
120
        }
121
    };
122
4
    if let Ok(
update_result3
) = timeout(idle_file_descriptor_timeout(), &mut update_fut).
await1
{
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 2, False: 1]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [Folded - Ignored]
  Branch (122:12): [True: 1, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [Folded - Ignored]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
123
3
        update_result.merge(read_result)
124
    } else {
125
1
        file.close_file()
126
1
            .await
127
1
            .err_tip(|| 
"Failed to close file in upload_file_to_store"0
)
?0
;
128
5
        
update_fut1
.await.
merge(read_result)1
129
    }
130
4
}
131
132
/// Optimizations that stores may want to expose to the callers.
133
/// This is useful for specific cases when the store can optimize the processing
134
/// of the data being processed.
135
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
136
pub enum StoreOptimizations {
137
    /// The store can optimize the upload process when it knows the data is coming from a file.
138
    FileUpdates,
139
140
    /// If the store will ignore the data uploads.
141
    NoopUpdates,
142
143
    /// If the store will never serve downloads.
144
    NoopDownloads,
145
}
146
147
/// Holds something that can be converted into a key the
148
/// store API can understand. Generally this is a digest
149
/// but it can also be a string if the caller wishes to
150
/// store the data directly and reference it by a string
151
/// directly.
152
#[derive(Debug, Eq)]
153
pub enum StoreKey<'a> {
154
    /// A string key.
155
    Str(Cow<'a, str>),
156
157
    /// A key that is a digest.
158
    Digest(DigestInfo),
159
}
160
161
impl<'a> StoreKey<'a> {
162
    /// Creates a new store key from a string.
163
0
    pub const fn new_str(s: &'a str) -> Self {
164
0
        StoreKey::Str(Cow::Borrowed(s))
165
0
    }
166
167
    /// Returns a shallow clone of the key.
168
    /// This is extremely cheap and should be used when clone
169
    /// is needed but the key is not going to be modified.
170
16.7k
    pub fn borrow(&'a self) -> StoreKey<'a> {
171
44
        match self {
172
31
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Borrowed(s)),
173
13
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Borrowed(s)),
174
16.7k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
175
        }
176
16.7k
    }
177
178
    /// Converts the key into an owned version. This is useful
179
    /// when the caller needs an owned version of the key.
180
11.6k
    pub fn into_owned(self) -> StoreKey<'static> {
181
38
        match self {
182
0
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Owned(s)),
183
38
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Owned(s.to_owned())),
184
11.6k
            StoreKey::Digest(d) => StoreKey::Digest(d),
185
        }
186
11.6k
    }
187
188
    /// Converts the key into a digest. This is useful when the caller
189
    /// must have a digest key. If the data is not a digest, it may
190
    /// hash the underlying key and return a digest of the hash of the key
191
176
    pub fn into_digest(self) -> DigestInfo {
192
176
        match self {
193
176
            StoreKey::Digest(digest) => digest,
194
0
            StoreKey::Str(s) => {
195
0
                let mut hasher = DigestHasherFunc::Blake3.hasher();
196
0
                hasher.update(s.as_bytes());
197
0
                hasher.finalize_digest()
198
            }
199
        }
200
176
    }
201
202
    /// Returns the key as a string. If the key is a digest, it will
203
    /// return a string representation of the digest. If the key is a string,
204
    /// it will return the string itself.
205
37
    pub fn as_str(&'a self) -> Cow<'a, str> {
206
7
        match self {
207
7
            StoreKey::Str(Cow::Owned(s)) => Cow::Borrowed(s),
208
0
            StoreKey::Str(Cow::Borrowed(s)) => Cow::Borrowed(s),
209
30
            StoreKey::Digest(d) => Cow::Owned(format!("{d}")),
210
        }
211
37
    }
212
}
213
214
impl Clone for StoreKey<'static> {
215
5.72k
    fn clone(&self) -> Self {
216
5.72k
        match self {
217
17
            StoreKey::Str(s) => StoreKey::Str(s.clone()),
218
5.70k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
219
        }
220
5.72k
    }
221
}
222
223
impl PartialOrd for StoreKey<'_> {
224
7
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
225
7
        Some(self.cmp(other))
226
7
    }
227
}
228
229
impl Ord for StoreKey<'_> {
230
24
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
231
24
        match (self, other) {
232
24
            (StoreKey::Str(a), StoreKey::Str(b)) => a.cmp(b),
233
0
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a.cmp(b),
234
0
            (StoreKey::Str(_), StoreKey::Digest(_)) => std::cmp::Ordering::Less,
235
0
            (StoreKey::Digest(_), StoreKey::Str(_)) => std::cmp::Ordering::Greater,
236
        }
237
24
    }
238
}
239
240
impl PartialEq for StoreKey<'_> {
241
5.49k
    fn eq(&self, other: &Self) -> bool {
242
5.49k
        match (self, other) {
243
34
            (StoreKey::Str(a), StoreKey::Str(b)) => a == b,
244
5.46k
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a == b,
245
0
            _ => false,
246
        }
247
5.49k
    }
248
}
249
250
impl Hash for StoreKey<'_> {
251
24.9k
    fn hash<H: Hasher>(&self, state: &mut H) {
252
        /// Salts the hash with the enum value that represents
253
        /// the type of the key.
254
        #[repr(u8)]
255
        enum HashId {
256
            Str = 0,
257
            Digest = 1,
258
        }
259
24.9k
        match self {
260
37
            StoreKey::Str(s) => {
261
37
                (HashId::Str as u8).hash(state);
262
37
                s.hash(state);
263
37
            }
264
24.8k
            StoreKey::Digest(d) => {
265
24.8k
                (HashId::Digest as u8).hash(state);
266
24.8k
                d.hash(state);
267
24.8k
            }
268
        }
269
24.9k
    }
270
}
271
272
impl<'a> From<&'a str> for StoreKey<'a> {
273
0
    fn from(s: &'a str) -> Self {
274
0
        StoreKey::Str(Cow::Borrowed(s))
275
0
    }
276
}
277
278
impl From<String> for StoreKey<'static> {
279
0
    fn from(s: String) -> Self {
280
0
        StoreKey::Str(Cow::Owned(s))
281
0
    }
282
}
283
284
impl<'a> From<DigestInfo> for StoreKey<'a> {
285
10.5k
    fn from(d: DigestInfo) -> Self {
286
10.5k
        StoreKey::Digest(d)
287
10.5k
    }
288
}
289
290
impl<'a> From<&DigestInfo> for StoreKey<'a> {
291
40
    fn from(d: &DigestInfo) -> Self {
292
40
        StoreKey::Digest(*d)
293
40
    }
294
}
295
296
1
#[derive(Clone, MetricsComponent)]
297
#[repr(transparent)]
298
pub struct Store {
299
    #[metric]
300
    inner: Arc<dyn StoreDriver>,
301
}
302
303
impl Store {
304
237
    pub fn new(inner: Arc<dyn StoreDriver>) -> Self {
305
237
        Self { inner }
306
237
    }
307
}
308
309
impl Store {
310
    /// Returns the immediate inner store driver.
311
    /// Note: This does not recursively try to resolve underlying store drivers
312
    /// like `.inner_store()` does.
313
    #[inline]
314
1
    pub fn into_inner(self) -> Arc<dyn StoreDriver> {
315
1
        self.inner
316
1
    }
317
318
    /// Gets the underlying store for the given digest.
319
    /// A caller might want to use this to obtain a reference to the "real" underlying store
320
    /// (if applicable) and check if it implements some special traits that allow optimizations.
321
    /// Note: If the store performs complex operations on the data, it should return itself.
322
    #[inline]
323
88
    pub fn inner_store<'a, K: Into<StoreKey<'a>>>(&self, digest: Option<K>) -> &dyn StoreDriver {
324
88
        self.inner.inner_store(digest.map(Into::into))
325
88
    }
326
327
    /// Tries to cast the underlying store to the given type.
328
    #[inline]
329
55
    pub fn downcast_ref<U: StoreDriver>(&self, maybe_digest: Option<StoreKey<'_>>) -> Option<&U> {
330
55
        self.inner.inner_store(maybe_digest).as_any().downcast_ref()
331
55
    }
332
333
    /// Register health checks used to monitor the store.
334
    #[inline]
335
0
    pub fn register_health(&self, registry: &mut HealthRegistryBuilder) {
336
0
        self.inner.clone().register_health(registry);
337
0
    }
338
}
339
340
impl StoreLike for Store {
341
    #[inline]
342
9.98k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
343
9.98k
        self.inner.as_ref()
344
9.98k
    }
345
346
7
    fn as_pin(&self) -> Pin<&Self> {
347
7
        Pin::new(self)
348
7
    }
349
}
350
351
impl<T> StoreLike for T
352
where
353
    T: StoreDriver + Sized,
354
{
355
    #[inline]
356
7.90k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
357
7.90k
        self
358
7.90k
    }
359
360
54
    fn as_pin(&self) -> Pin<&Self> {
361
54
        Pin::new(self)
362
54
    }
363
}
364
365
pub trait StoreLike: Send + Sync + Sized + Unpin + 'static {
366
    /// Returns the immediate inner store driver.
367
    fn as_store_driver(&self) -> &'_ dyn StoreDriver;
368
369
    /// Utility function to return a pinned reference to self.
370
    fn as_pin(&self) -> Pin<&Self>;
371
372
    /// Utility function to return a pinned reference to the store driver.
373
    #[inline]
374
17.8k
    fn as_store_driver_pin(&self) -> Pin<&'_ dyn StoreDriver> {
375
17.8k
        Pin::new(self.as_store_driver())
376
17.8k
    }
377
378
    /// Look up a digest in the store and return None if it does not exist in
379
    /// the store, or Some(size) if it does.
380
    /// Note: On an AC store the size will be incorrect and should not be used!
381
    #[inline]
382
212
    fn has<'a>(
383
212
        &'a self,
384
212
        digest: impl Into<StoreKey<'a>>,
385
212
    ) -> impl Future<Output = Result<Option<u64>, Error>> + 'a {
386
212
        self.as_store_driver_pin().has(digest.into())
387
212
    }
388
389
    /// Look up a list of digests in the store and return a result for each in
390
    /// the same order as input.  The result will either be None if it does not
391
    /// exist in the store, or Some(size) if it does.
392
    /// Note: On an AC store the size will be incorrect and should not be used!
393
    #[inline]
394
19
    fn has_many<'a>(
395
19
        &'a self,
396
19
        digests: &'a [StoreKey<'a>],
397
19
    ) -> impl Future<Output = Result<Vec<Option<u64>>, Error>> + Send + 'a {
398
19
        self.as_store_driver_pin().has_many(digests)
399
19
    }
400
401
    /// The implementation of the above has and has_many functions.  See their
402
    /// documentation for details.
403
    #[inline]
404
41
    fn has_with_results<'a>(
405
41
        &'a self,
406
41
        digests: &'a [StoreKey<'a>],
407
41
        results: &'a mut [Option<u64>],
408
41
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
409
41
        self.as_store_driver_pin()
410
41
            .has_with_results(digests, results)
411
41
    }
412
413
    /// List all the keys in the store that are within the given range.
414
    /// `handler` is called for each key in the range. If `handler` returns
415
    /// false, the listing is stopped.
416
    ///
417
    /// The number of keys passed through the handler is the return value.
418
    #[inline]
419
7
    fn list<'a, 'b>(
420
7
        &'a self,
421
7
        range: impl RangeBounds<StoreKey<'b>> + Send + 'b,
422
7
        mut handler: impl for<'c> FnMut(&'c StoreKey) -> bool + Send + Sync + 'a,
423
7
    ) -> impl Future<Output = Result<u64, Error>> + Send + 'a
424
7
    where
425
7
        'b: 'a,
426
7
    {
427
        // Note: We use a manual async move, so the future can own the `range` and `handler`,
428
        // otherwise we'd require the caller to pass them in by reference making more borrow
429
        // checker noise.
430
7
        async move {
431
7
            self.as_store_driver_pin()
432
7
                .list(
433
7
                    (
434
7
                        range.start_bound().map(StoreKey::borrow),
435
7
                        range.end_bound().map(StoreKey::borrow),
436
7
                    ),
437
7
                    &mut handler,
438
7
                )
439
0
                .await
440
7
        }
441
7
    }
442
443
    /// Sends the data to the store.
444
    #[inline]
445
5.14k
    fn update<'a>(
446
5.14k
        &'a self,
447
5.14k
        digest: impl Into<StoreKey<'a>>,
448
5.14k
        reader: DropCloserReadHalf,
449
5.14k
        upload_size: UploadSizeInfo,
450
5.14k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
451
5.14k
        self.as_store_driver_pin()
452
5.14k
            .update(digest.into(), reader, upload_size)
453
5.14k
    }
454
455
    /// Any optimizations the store might want to expose to the callers.
456
    /// By default, no optimizations are exposed.
457
    #[inline]
458
6
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
459
6
        self.as_store_driver_pin().optimized_for(optimization)
460
6
    }
461
462
    /// Specialized version of `.update()` which takes a `ResumeableFileSlot`.
463
    /// This is useful if the underlying store can optimize the upload process
464
    /// when it knows the data is coming from a file.
465
    #[inline]
466
7
    fn update_with_whole_file<'a>(
467
7
        &'a self,
468
7
        digest: impl Into<StoreKey<'a>>,
469
7
        file: fs::ResumeableFileSlot,
470
7
        upload_size: UploadSizeInfo,
471
7
    ) -> impl Future<Output = Result<Option<fs::ResumeableFileSlot>, Error>> + Send + 'a {
472
7
        self.as_store_driver_pin()
473
7
            .update_with_whole_file(digest.into(), file, upload_size)
474
7
    }
475
476
    /// Utility to send all the data to the store when you have all the bytes.
477
    #[inline]
478
5.66k
    fn update_oneshot<'a>(
479
5.66k
        &'a self,
480
5.66k
        digest: impl Into<StoreKey<'a>>,
481
5.66k
        data: Bytes,
482
5.66k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
483
5.66k
        self.as_store_driver_pin()
484
5.66k
            .update_oneshot(digest.into(), data)
485
5.66k
    }
486
487
    /// Retrieves part of the data from the store and writes it to the given writer.
488
    #[inline]
489
1.27k
    fn get_part<'a>(
490
1.27k
        &'a self,
491
1.27k
        digest: impl Into<StoreKey<'a>>,
492
1.27k
        mut writer: impl BorrowMut<DropCloserWriteHalf> + Send + 'a,
493
1.27k
        offset: u64,
494
1.27k
        length: Option<u64>,
495
1.27k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
496
1.27k
        let key = digest.into();
497
        // Note: We need to capture `writer` just in case the caller
498
        // expects the drop() method to be called on it when the future
499
        // is done due to the complex interaction between the DropCloserWriteHalf
500
        // and the DropCloserReadHalf during drop().
501
1.27k
        async move {
502
1.27k
            self.as_store_driver_pin()
503
1.27k
                .get_part(key, writer.borrow_mut(), offset, length)
504
138
                .await
505
1.27k
        }
506
1.27k
    }
507
508
    /// Utility that works the same as `.get_part()`, but writes all the data.
509
    #[inline]
510
15
    fn get<'a>(
511
15
        &'a self,
512
15
        key: impl Into<StoreKey<'a>>,
513
15
        writer: DropCloserWriteHalf,
514
15
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
515
15
        self.as_store_driver_pin().get(key.into(), writer)
516
15
    }
517
518
    /// Utility that will return all the bytes at once instead of in a streaming manner.
519
    #[inline]
520
5.34k
    fn get_part_unchunked<'a>(
521
5.34k
        &'a self,
522
5.34k
        key: impl Into<StoreKey<'a>>,
523
5.34k
        offset: u64,
524
5.34k
        length: Option<u64>,
525
5.34k
    ) -> impl Future<Output = Result<Bytes, Error>> + Send + 'a {
526
5.34k
        self.as_store_driver_pin()
527
5.34k
            .get_part_unchunked(key.into(), offset, length)
528
5.34k
    }
529
530
    /// Default implementation of the health check. Some stores may want to override this
531
    /// in situations where the default implementation is not sufficient.
532
    #[inline]
533
0
    fn check_health(
534
0
        &self,
535
0
        namespace: Cow<'static, str>,
536
0
    ) -> impl Future<Output = HealthStatus> + Send {
537
0
        self.as_store_driver_pin().check_health(namespace)
538
0
    }
539
}
540
541
#[async_trait]
542
pub trait StoreDriver:
543
    Sync + Send + Unpin + MetricsComponent + HealthStatusIndicator + 'static
544
{
545
    /// See: [`StoreLike::has`] for details.
546
    #[inline]
547
220
    async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<u64>, Error> {
548
220
        let mut result = [None];
549
220
        self.has_with_results(&[key], &mut result).
await67
?0
;
550
220
        Ok(result[0])
551
440
    }
552
553
    /// See: [`StoreLike::has_many`] for details.
554
    #[inline]
555
    async fn has_many(
556
        self: Pin<&Self>,
557
        digests: &[StoreKey<'_>],
558
19
    ) -> Result<Vec<Option<u64>>, Error> {
559
19
        let mut results = vec![None; digests.len()];
560
19
        self.has_with_results(digests, &mut results).
await12
?0
;
561
19
        Ok(results)
562
38
    }
563
564
    /// See: [`StoreLike::has_with_results`] for details.
565
    async fn has_with_results(
566
        self: Pin<&Self>,
567
        digests: &[StoreKey<'_>],
568
        results: &mut [Option<u64>],
569
    ) -> Result<(), Error>;
570
571
    /// See: [`StoreLike::list`] for details.
572
    async fn list(
573
        self: Pin<&Self>,
574
        _range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
575
        _handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
576
0
    ) -> Result<u64, Error> {
577
        // TODO(allada) We should force all stores to implement this function instead of
578
        // providing a default implementation.
579
0
        Err(make_err!(
580
0
            Code::Unimplemented,
581
0
            "Store::list() not implemented for this store"
582
0
        ))
583
0
    }
584
585
    /// See: [`StoreLike::update`] for details.
586
    async fn update(
587
        self: Pin<&Self>,
588
        key: StoreKey<'_>,
589
        reader: DropCloserReadHalf,
590
        upload_size: UploadSizeInfo,
591
    ) -> Result<(), Error>;
592
593
    /// See: [`StoreLike::optimized_for`] for details.
594
46
    fn optimized_for(&self, _optimization: StoreOptimizations) -> bool {
595
46
        false
596
46
    }
597
598
    /// See: [`StoreLike::update_with_whole_file`] for details.
599
    async fn update_with_whole_file(
600
        self: Pin<&Self>,
601
        key: StoreKey<'_>,
602
        mut file: fs::ResumeableFileSlot,
603
        upload_size: UploadSizeInfo,
604
1
    ) -> Result<Option<fs::ResumeableFileSlot>, Error> {
605
1
        let inner_store = self.inner_store(Some(key.borrow()));
606
1
        if inner_store.optimized_for(StoreOptimizations::FileUpdates) {
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [Folded - Ignored]
  Branch (606:12): [True: 0, False: 1]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [Folded - Ignored]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [True: 0, False: 0]
607
0
            error_if!(
608
0
                addr_eq(inner_store, &*self),
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [Folded - Ignored]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [Folded - Ignored]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
  Branch (608:17): [True: 0, False: 0]
609
                "Store::inner_store() returned self when optimization present"
610
            );
611
0
            return Pin::new(inner_store)
612
0
                .update_with_whole_file(key, file, upload_size)
613
0
                .await;
614
1
        }
615
67
        slow_update_store_with_file(self, key, &mut file, upload_size).await
?0
;
616
1
        Ok(Some(file))
617
2
    }
618
619
    /// See: [`StoreLike::update_oneshot`] for details.
620
5.73k
    async fn update_oneshot(self: Pin<&Self>, key: StoreKey<'_>, data: Bytes) -> Result<(), Error> {
621
        // TODO(blaise.bruer) This is extremely inefficient, since we have exactly
622
        // what we need here. Maybe we could instead make a version of the stream
623
        // that can take objects already fully in memory instead?
624
5.73k
        let (mut tx, rx) = make_buf_channel_pair();
625
626
5.73k
        let data_len =
627
5.73k
            u64::try_from(data.len()).err_tip(|| 
"Could not convert data.len() to u64"0
)
?0
;
628
5.73k
        let send_fut = async move {
629
5.73k
            // Only send if we are not EOF.
630
5.73k
            if !data.is_empty() {
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 31, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [Folded - Ignored]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 1, False: 0]
  Branch (630:16): [True: 16, False: 40]
  Branch (630:16): [True: 8, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 4, False: 1]
  Branch (630:16): [True: 7, False: 0]
  Branch (630:16): [True: 104, False: 0]
  Branch (630:16): [True: 2, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 2, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 4, False: 0]
  Branch (630:16): [True: 1, False: 0]
  Branch (630:16): [True: 2, False: 0]
  Branch (630:16): [True: 2, False: 0]
  Branch (630:16): [True: 2, False: 0]
  Branch (630:16): [True: 2, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 1, False: 0]
  Branch (630:16): [True: 12, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 406, False: 1]
  Branch (630:16): [Folded - Ignored]
  Branch (630:16): [True: 3, False: 2]
  Branch (630:16): [True: 2, False: 0]
  Branch (630:16): [True: 1, False: 0]
  Branch (630:16): [True: 13, False: 1]
  Branch (630:16): [True: 25, False: 13]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 1, False: 0]
  Branch (630:16): [True: 5.00k, False: 0]
  Branch (630:16): [True: 6, False: 0]
  Branch (630:16): [True: 4, False: 0]
  Branch (630:16): [True: 2, False: 0]
  Branch (630:16): [True: 0, False: 0]
  Branch (630:16): [True: 7, False: 0]
631
5.67k
                tx.send(data)
632
38
                    .await
633
5.67k
                    .err_tip(|| 
"Failed to write data in update_oneshot"0
)
?0
;
634
58
            }
635
5.73k
            tx.send_eof()
636
5.73k
                .err_tip(|| 
"Failed to write EOF in update_oneshot"0
)
?0
;
637
5.73k
            Ok(())
638
5.73k
        };
639
5.73k
        try_join!(
640
5.73k
            send_fut,
641
5.73k
            self.update(key, rx, UploadSizeInfo::ExactSize(data_len))
642
5.73k
        )
?2
;
643
5.73k
        Ok(())
644
11.4k
    }
645
646
    /// See: [`StoreLike::get_part`] for details.
647
    async fn get_part(
648
        self: Pin<&Self>,
649
        key: StoreKey<'_>,
650
        writer: &mut DropCloserWriteHalf,
651
        offset: u64,
652
        length: Option<u64>,
653
    ) -> Result<(), Error>;
654
655
    /// See: [`StoreLike::get`] for details.
656
    #[inline]
657
    async fn get(
658
        self: Pin<&Self>,
659
        key: StoreKey<'_>,
660
        mut writer: DropCloserWriteHalf,
661
19
    ) -> Result<(), Error> {
662
1.08k
        
self.get_part(key, &mut writer, 0, None)19
.await
663
37
    }
664
665
    /// See: [`StoreLike::get_part_unchunked`] for details.
666
    async fn get_part_unchunked(
667
        self: Pin<&Self>,
668
        key: StoreKey<'_>,
669
        offset: u64,
670
        length: Option<u64>,
671
5.42k
    ) -> Result<Bytes, Error> {
672
5.42k
        let length_usize = length
673
5.42k
            .map(|v| 
usize::try_from(v).err_tip(2.21k
||
"Could not convert length to usize"0
)2.21k
)
674
5.42k
            .transpose()
?0
;
675
676
        // TODO(blaise.bruer) This is extremely inefficient, since we have exactly
677
        // what we need here. Maybe we could instead make a version of the stream
678
        // that can take objects already fully in memory instead?
679
5.42k
        let (mut tx, mut rx) = make_buf_channel_pair();
680
681
5.42k
        let (data_res, get_part_res) = join!(
682
5.42k
            rx.consume(length_usize),
683
5.42k
            // We use a closure here to ensure that the `tx` is dropped when the
684
5.42k
            // future is done.
685
5.42k
            async move { self.get_part(key, &mut tx, offset, length).
await4.08k
},
686
5.42k
        );
687
5.42k
        get_part_res
688
5.42k
            .err_tip(|| 
"Failed to get_part in get_part_unchunked"9
)
689
5.42k
            .merge(data_res.err_tip(|| 
"Failed to read stream to completion in get_part_unchunked"9
))
690
10.8k
    }
691
692
    /// See: [`StoreLike::check_health`] for details.
693
0
    async fn check_health(self: Pin<&Self>, namespace: Cow<'static, str>) -> HealthStatus {
694
0
        let digest_data_size = default_digest_size_health_check();
695
0
        let mut digest_data = vec![0u8; digest_data_size];
696
0
697
0
        let mut namespace_hasher = StdHasher::new();
698
0
        namespace.hash(&mut namespace_hasher);
699
0
        self.get_name().hash(&mut namespace_hasher);
700
0
        let hash_seed = namespace_hasher.finish();
701
0
702
0
        // Fill the digest data with random data based on a stable
703
0
        // hash of the namespace and store name. Intention is to
704
0
        // have randomly filled data that is unique per store and
705
0
        // does not change between health checks. This is to ensure
706
0
        // we are not adding more data to store on each health check.
707
0
        let mut rng: StdRng = StdRng::seed_from_u64(hash_seed);
708
0
        rng.fill_bytes(&mut digest_data);
709
0
710
0
        let mut digest_hasher = default_digest_hasher_func().hasher();
711
0
        digest_hasher.update(&digest_data);
712
0
        let digest_data_len = digest_data.len() as u64;
713
0
        let digest_info = StoreKey::from(digest_hasher.finalize_digest());
714
0
715
0
        let digest_bytes = bytes::Bytes::copy_from_slice(&digest_data);
716
717
0
        if let Err(e) = self
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [Folded - Ignored]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [Folded - Ignored]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
  Branch (717:16): [True: 0, False: 0]
718
0
            .update_oneshot(digest_info.borrow(), digest_bytes.clone())
719
0
            .await
720
        {
721
0
            return HealthStatus::new_failed(
722
0
                self.get_ref(),
723
0
                format!("Store.update_oneshot() failed: {e}").into(),
724
0
            );
725
0
        }
726
0
727
0
        match self.has(digest_info.borrow()).await {
728
0
            Ok(Some(s)) => {
729
0
                if s != digest_data_len {
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [Folded - Ignored]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [Folded - Ignored]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
  Branch (729:20): [True: 0, False: 0]
730
0
                    return HealthStatus::new_failed(
731
0
                        self.get_ref(),
732
0
                        format!("Store.has() size mismatch {s} != {digest_data_len}").into(),
733
0
                    );
734
0
                }
735
            }
736
            Ok(None) => {
737
0
                return HealthStatus::new_failed(
738
0
                    self.get_ref(),
739
0
                    "Store.has() size not found".into(),
740
0
                );
741
            }
742
0
            Err(e) => {
743
0
                return HealthStatus::new_failed(
744
0
                    self.get_ref(),
745
0
                    format!("Store.has() failed: {e}").into(),
746
0
                );
747
            }
748
        }
749
750
0
        match self
751
0
            .get_part_unchunked(digest_info, 0, Some(digest_data_len))
752
0
            .await
753
        {
754
0
            Ok(b) => {
755
0
                if b != digest_bytes {
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [Folded - Ignored]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [Folded - Ignored]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
  Branch (755:20): [True: 0, False: 0]
756
0
                    return HealthStatus::new_failed(
757
0
                        self.get_ref(),
758
0
                        "Store.get_part_unchunked() data mismatch".into(),
759
0
                    );
760
0
                }
761
            }
762
0
            Err(e) => {
763
0
                return HealthStatus::new_failed(
764
0
                    self.get_ref(),
765
0
                    format!("Store.get_part_unchunked() failed: {e}").into(),
766
0
                );
767
            }
768
        }
769
770
0
        HealthStatus::new_ok(self.get_ref(), "Successfully store health check".into())
771
0
    }
772
773
    /// See: [`Store::inner_store`] for details.
774
    fn inner_store(&self, _digest: Option<StoreKey<'_>>) -> &dyn StoreDriver;
775
776
    /// Returns an Any variation of whatever Self is.
777
    fn as_any(&self) -> &(dyn std::any::Any + Sync + Send + 'static);
778
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static>;
779
780
    // Register health checks used to monitor the store.
781
0
    fn register_health(self: Arc<Self>, _registry: &mut HealthRegistryBuilder) {}
782
}
783
784
/// The instructions on how to decode a value from a Bytes & version into
785
/// the underlying type.
786
pub trait SchedulerStoreDecodeTo {
787
    type DecodeOutput;
788
    fn decode(version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error>;
789
}
790
791
pub trait SchedulerSubscription: Send + Sync {
792
    fn changed(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
793
}
794
795
pub trait SchedulerSubscriptionManager: Send + Sync {
796
    type Subscription: SchedulerSubscription;
797
798
    fn notify_for_test(&self, value: String);
799
800
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
801
    where
802
        K: SchedulerStoreKeyProvider;
803
}
804
805
/// The API surface for a scheduler store.
806
pub trait SchedulerStore: Send + Sync + 'static {
807
    type SubscriptionManager: SchedulerSubscriptionManager;
808
809
    /// Returns the subscription manager for the scheduler store.
810
    fn subscription_manager(&self) -> Result<Arc<Self::SubscriptionManager>, Error>;
811
812
    /// Updates or inserts an entry into the underlying store.
813
    /// Metadata about the key is attached to the compile-time type.
814
    /// If StoreKeyProvider::Versioned is TrueValue, the data will not
815
    /// be updated if the current version in the database does not match
816
    /// the version in the passed in data.
817
    /// No guarantees are made about when Version is FalseValue.
818
    /// Indexes are guaranteed to be updated atomically with the data.
819
    fn update_data<T>(&self, data: T) -> impl Future<Output = Result<Option<u64>, Error>> + Send
820
    where
821
        T: SchedulerStoreDataProvider
822
            + SchedulerStoreKeyProvider
823
            + SchedulerCurrentVersionProvider
824
            + Send;
825
826
    /// Searches for all keys in the store that match the given index prefix.
827
    fn search_by_index_prefix<K>(
828
        &self,
829
        index: K,
830
    ) -> impl Future<
831
        Output = Result<
832
            impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
833
            Error,
834
        >,
835
    > + Send
836
    where
837
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send;
838
839
    /// Returns data for the provided key with the given version if
840
    /// StoreKeyProvider::Versioned is TrueValue.
841
    fn get_and_decode<K>(
842
        &self,
843
        key: K,
844
    ) -> impl Future<Output = Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>> + Send
845
    where
846
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send;
847
}
848
849
/// A type that is used to let the scheduler store know what
850
/// index is beign requested.
851
pub trait SchedulerIndexProvider {
852
    /// Only keys inserted with this prefix will be indexed.
853
    const KEY_PREFIX: &'static str;
854
855
    /// The name of the index.
856
    const INDEX_NAME: &'static str;
857
858
    /// If the data is versioned.
859
    type Versioned: BoolValue;
860
861
    /// The value of the index.
862
    fn index_value_prefix(&self) -> Cow<'_, str>;
863
}
864
865
/// Provides a key to lookup data in the store.
866
pub trait SchedulerStoreKeyProvider {
867
    /// If the data is versioned.
868
    type Versioned: BoolValue;
869
870
    /// Returns the key for the data.
871
    fn get_key(&self) -> StoreKey<'static>;
872
}
873
874
/// Provides data to be stored in the scheduler store.
875
pub trait SchedulerStoreDataProvider {
876
    /// Converts the data into bytes to be stored in the store.
877
    fn try_into_bytes(self) -> Result<Bytes, Error>;
878
879
    /// Returns the indexes for the data if any.
880
1
    fn get_indexes(&self) -> Result<Vec<(&'static str, Bytes)>, Error> {
881
1
        Ok(Vec::new())
882
1
    }
883
}
884
885
/// Provides the current version of the data in the store.
886
pub trait SchedulerCurrentVersionProvider {
887
    /// Returns the current version of the data in the store.
888
    fn current_version(&self) -> u64;
889
}
890
891
/// Default implementation for when we are not providing a version
892
/// for the data.
893
impl<T> SchedulerCurrentVersionProvider for T
894
where
895
    T: SchedulerStoreKeyProvider<Versioned = FalseValue>,
896
{
897
0
    fn current_version(&self) -> u64 {
898
0
        0
899
0
    }
900
}
901
902
/// Compile time types for booleans.
903
pub trait BoolValue {
904
    const VALUE: bool;
905
}
906
/// Compile time check if something is false.
907
pub trait IsFalse {}
908
/// Compile time check if something is true.
909
pub trait IsTrue {}
910
911
/// Compile time true value.
912
pub struct TrueValue;
913
impl BoolValue for TrueValue {
914
    const VALUE: bool = true;
915
}
916
impl IsTrue for TrueValue {}
917
918
/// Compile time false value.
919
pub struct FalseValue;
920
impl BoolValue for FalseValue {
921
    const VALUE: bool = false;
922
}
923
impl IsFalse for FalseValue {}