Coverage Report

Created: 2024-11-20 10:13

/build/source/nativelink-util/src/store_trait.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::{BorrowMut, Cow};
16
use std::collections::hash_map::DefaultHasher as StdHasher;
17
use std::convert::Into;
18
use std::hash::{Hash, Hasher};
19
use std::ops::{Bound, RangeBounds};
20
use std::pin::Pin;
21
use std::ptr::addr_eq;
22
use std::sync::{Arc, OnceLock};
23
24
use async_trait::async_trait;
25
use bytes::{Bytes, BytesMut};
26
use futures::future::{select, Either};
27
use futures::{join, try_join, Future, FutureExt, Stream};
28
use nativelink_error::{error_if, make_err, Code, Error, ResultExt};
29
use nativelink_metric::MetricsComponent;
30
use rand::rngs::StdRng;
31
use rand::{RngCore, SeedableRng};
32
use serde::{Deserialize, Serialize};
33
use tokio::io::AsyncSeekExt;
34
use tokio::time::timeout;
35
36
use crate::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
37
use crate::common::DigestInfo;
38
use crate::digest_hasher::{default_digest_hasher_func, DigestHasher, DigestHasherFunc};
39
use crate::fs::{self, idle_file_descriptor_timeout};
40
use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
41
42
static DEFAULT_DIGEST_SIZE_HEALTH_CHECK: OnceLock<usize> = OnceLock::new();
43
/// Default digest size for health check data. Any change in this value
44
/// changes the default contract. `GlobalConfig` should be updated to reflect
45
/// changes in this value.
46
pub const DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG: usize = 1024 * 1024;
47
48
// Get the default digest size for health check data, if value is unset a system wide default is used.
49
0
pub fn default_digest_size_health_check() -> usize {
50
0
    *DEFAULT_DIGEST_SIZE_HEALTH_CHECK.get_or_init(|| DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG)
51
0
}
52
53
/// Set the default digest size for health check data, this should be called once.
54
0
pub fn set_default_digest_size_health_check(size: usize) -> Result<(), Error> {
55
0
    DEFAULT_DIGEST_SIZE_HEALTH_CHECK.set(size).map_err(|_| {
56
0
        make_err!(
57
0
            Code::Internal,
58
0
            "set_default_digest_size_health_check already set"
59
0
        )
60
0
    })
61
0
}
62
63
2.45k
#[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize)]
64
pub enum UploadSizeInfo {
65
    /// When the data transfer amount is known to be exact size, this enum should be used.
66
    /// The receiver store can use this to better optimize the way the data is sent or stored.
67
    ExactSize(u64),
68
69
    /// When the data transfer amount is not known to be exact, the caller should use this enum
70
    /// to provide the maximum size that could possibly be sent. This will bypass the exact size
71
    /// checks, but still provide useful information to the underlying store about the data being
72
    /// sent that it can then use to optimize the upload process.
73
    MaxSize(u64),
74
}
75
76
/// Utility to send all the data to the store from a file.
77
// Note: This is not inlined because some code may want to bypass any underlying
78
// optimizations that may be present in the inner store.
79
4
pub async fn slow_update_store_with_file<S: StoreDriver + ?Sized>(
80
4
    store: Pin<&S>,
81
4
    digest: impl Into<StoreKey<'_>>,
82
4
    file: &mut fs::ResumeableFileSlot,
83
4
    upload_size: UploadSizeInfo,
84
4
) -> Result<(), Error> {
85
4
    file.as_writer()
86
0
        .await
87
4
        .err_tip(|| 
"Failed to get writer in upload_file_to_store"0
)
?0
88
4
        .rewind()
89
4
        .await
90
4
        .err_tip(|| 
"Failed to rewind in upload_file_to_store"0
)
?0
;
91
4
    let (tx, rx) = make_buf_channel_pair();
92
4
93
4
    let mut update_fut = store
94
4
        .update(digest.into(), rx, upload_size)
95
4
        .map(|r| r.err_tip(|| 
"Could not upload data to store in upload_file_to_store"0
));
96
4
    let read_result = {
97
4
        let read_data_fut = async {
98
4
            let (_, mut tx) = file
99
4
                .read_buf_cb(
100
4
                    (BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE), tx),
101
67
                    move |(chunk, mut tx)| async move {
102
67
                        tx.send(chunk.freeze())
103
0
                            .await
104
67
                            .err_tip(|| 
"Failed to send in upload_file_to_store"0
)
?0
;
105
67
                        Ok((BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE), tx))
106
134
                    },
107
4
                )
108
74
                .await
109
4
                .err_tip(|| 
"Error in upload_file_to_store::read_buf_cb section"0
)
?0
;
110
4
            tx.send_eof()
111
4
                .err_tip(|| 
"Could not send EOF to store in upload_file_to_store"0
)
?0
;
112
4
            Ok(())
113
4
        };
114
4
        tokio::pin!(read_data_fut);
115
74
        match select(&mut update_fut, read_data_fut).await {
116
0
            Either::Left((update_result, read_data_fut)) => {
117
0
                return update_result.merge(read_data_fut.await)
118
            }
119
4
            Either::Right((read_result, _)) => read_result,
120
        }
121
    };
122
4
    if let Ok(
update_result3
) = timeout(idle_file_descriptor_timeout(), &mut update_fut).
await1
{
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 2, False: 1]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [Folded - Ignored]
  Branch (122:12): [True: 1, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [Folded - Ignored]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
  Branch (122:12): [True: 0, False: 0]
123
3
        update_result.merge(read_result)
124
    } else {
125
1
        file.close_file()
126
1
            .await
127
1
            .err_tip(|| 
"Failed to close file in upload_file_to_store"0
)
?0
;
128
5
        
update_fut1
.await.
merge(read_result)1
129
    }
130
4
}
131
132
/// Optimizations that stores may want to expose to the callers.
133
/// This is useful for specific cases when the store can optimize the processing
134
/// of the data being processed.
135
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
136
pub enum StoreOptimizations {
137
    /// The store can optimize the upload process when it knows the data is coming from a file.
138
    FileUpdates,
139
140
    /// If the store will ignore the data uploads.
141
    NoopUpdates,
142
143
    /// If the store will never serve downloads.
144
    NoopDownloads,
145
}
146
147
/// Holds something that can be converted into a key the
148
/// store API can understand. Generally this is a digest
149
/// but it can also be a string if the caller wishes to
150
/// store the data directly and reference it by a string
151
/// directly.
152
#[derive(Debug, Eq)]
153
pub enum StoreKey<'a> {
154
    /// A string key.
155
    Str(Cow<'a, str>),
156
157
    /// A key that is a digest.
158
    Digest(DigestInfo),
159
}
160
161
impl<'a> StoreKey<'a> {
162
    /// Creates a new store key from a string.
163
0
    pub const fn new_str(s: &'a str) -> Self {
164
0
        StoreKey::Str(Cow::Borrowed(s))
165
0
    }
166
167
    /// Returns a shallow clone of the key.
168
    /// This is extremely cheap and should be used when clone
169
    /// is needed but the key is not going to be modified.
170
    #[must_use]
171
16.7k
    pub fn borrow(&'a self) -> StoreKey<'a> {
172
44
        match self {
173
31
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Borrowed(s)),
174
13
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Borrowed(s)),
175
16.7k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
176
        }
177
16.7k
    }
178
179
    /// Converts the key into an owned version. This is useful
180
    /// when the caller needs an owned version of the key.
181
11.6k
    pub fn into_owned(self) -> StoreKey<'static> {
182
38
        match self {
183
0
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Owned(s)),
184
38
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Owned(s.to_owned())),
185
11.6k
            StoreKey::Digest(d) => StoreKey::Digest(d),
186
        }
187
11.6k
    }
188
189
    /// Converts the key into a digest. This is useful when the caller
190
    /// must have a digest key. If the data is not a digest, it may
191
    /// hash the underlying key and return a digest of the hash of the key
192
176
    pub fn into_digest(self) -> DigestInfo {
193
176
        match self {
194
176
            StoreKey::Digest(digest) => digest,
195
0
            StoreKey::Str(s) => {
196
0
                let mut hasher = DigestHasherFunc::Blake3.hasher();
197
0
                hasher.update(s.as_bytes());
198
0
                hasher.finalize_digest()
199
            }
200
        }
201
176
    }
202
203
    /// Returns the key as a string. If the key is a digest, it will
204
    /// return a string representation of the digest. If the key is a string,
205
    /// it will return the string itself.
206
37
    pub fn as_str(&'a self) -> Cow<'a, str> {
207
7
        match self {
208
7
            StoreKey::Str(Cow::Owned(s)) => Cow::Borrowed(s),
209
0
            StoreKey::Str(Cow::Borrowed(s)) => Cow::Borrowed(s),
210
30
            StoreKey::Digest(d) => Cow::Owned(format!("{d}")),
211
        }
212
37
    }
213
}
214
215
impl Clone for StoreKey<'static> {
216
5.72k
    fn clone(&self) -> Self {
217
5.72k
        match self {
218
17
            StoreKey::Str(s) => StoreKey::Str(s.clone()),
219
5.70k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
220
        }
221
5.72k
    }
222
}
223
224
impl PartialOrd for StoreKey<'_> {
225
7
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
226
7
        Some(self.cmp(other))
227
7
    }
228
}
229
230
impl Ord for StoreKey<'_> {
231
24
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
232
24
        match (self, other) {
233
24
            (StoreKey::Str(a), StoreKey::Str(b)) => a.cmp(b),
234
0
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a.cmp(b),
235
0
            (StoreKey::Str(_), StoreKey::Digest(_)) => std::cmp::Ordering::Less,
236
0
            (StoreKey::Digest(_), StoreKey::Str(_)) => std::cmp::Ordering::Greater,
237
        }
238
24
    }
239
}
240
241
impl PartialEq for StoreKey<'_> {
242
5.51k
    fn eq(&self, other: &Self) -> bool {
243
5.51k
        match (self, other) {
244
36
            (StoreKey::Str(a), StoreKey::Str(b)) => a == b,
245
5.48k
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a == b,
246
0
            _ => false,
247
        }
248
5.51k
    }
249
}
250
251
impl Hash for StoreKey<'_> {
252
24.9k
    fn hash<H: Hasher>(&self, state: &mut H) {
253
        /// Salts the hash with the enum value that represents
254
        /// the type of the key.
255
        #[repr(u8)]
256
        enum HashId {
257
            Str = 0,
258
            Digest = 1,
259
        }
260
24.9k
        match self {
261
37
            StoreKey::Str(s) => {
262
37
                (HashId::Str as u8).hash(state);
263
37
                s.hash(state);
264
37
            }
265
24.8k
            StoreKey::Digest(d) => {
266
24.8k
                (HashId::Digest as u8).hash(state);
267
24.8k
                d.hash(state);
268
24.8k
            }
269
        }
270
24.9k
    }
271
}
272
273
impl<'a> From<&'a str> for StoreKey<'a> {
274
0
    fn from(s: &'a str) -> Self {
275
0
        StoreKey::Str(Cow::Borrowed(s))
276
0
    }
277
}
278
279
impl From<String> for StoreKey<'static> {
280
0
    fn from(s: String) -> Self {
281
0
        StoreKey::Str(Cow::Owned(s))
282
0
    }
283
}
284
285
impl<'a> From<DigestInfo> for StoreKey<'a> {
286
10.5k
    fn from(d: DigestInfo) -> Self {
287
10.5k
        StoreKey::Digest(d)
288
10.5k
    }
289
}
290
291
impl<'a> From<&DigestInfo> for StoreKey<'a> {
292
40
    fn from(d: &DigestInfo) -> Self {
293
40
        StoreKey::Digest(*d)
294
40
    }
295
}
296
297
1
#[derive(Clone, MetricsComponent)]
298
#[repr(transparent)]
299
pub struct Store {
300
    #[metric]
301
    inner: Arc<dyn StoreDriver>,
302
}
303
304
impl Store {
305
237
    pub fn new(inner: Arc<dyn StoreDriver>) -> Self {
306
237
        Self { inner }
307
237
    }
308
}
309
310
impl Store {
311
    /// Returns the immediate inner store driver.
312
    /// Note: This does not recursively try to resolve underlying store drivers
313
    /// like `.inner_store()` does.
314
    #[inline]
315
1
    pub fn into_inner(self) -> Arc<dyn StoreDriver> {
316
1
        self.inner
317
1
    }
318
319
    /// Gets the underlying store for the given digest.
320
    /// A caller might want to use this to obtain a reference to the "real" underlying store
321
    /// (if applicable) and check if it implements some special traits that allow optimizations.
322
    /// Note: If the store performs complex operations on the data, it should return itself.
323
    #[inline]
324
88
    pub fn inner_store<'a, K: Into<StoreKey<'a>>>(&self, digest: Option<K>) -> &dyn StoreDriver {
325
88
        self.inner.inner_store(digest.map(Into::into))
326
88
    }
327
328
    /// Tries to cast the underlying store to the given type.
329
    #[inline]
330
55
    pub fn downcast_ref<U: StoreDriver>(&self, maybe_digest: Option<StoreKey<'_>>) -> Option<&U> {
331
55
        self.inner.inner_store(maybe_digest).as_any().downcast_ref()
332
55
    }
333
334
    /// Register health checks used to monitor the store.
335
    #[inline]
336
0
    pub fn register_health(&self, registry: &mut HealthRegistryBuilder) {
337
0
        self.inner.clone().register_health(registry);
338
0
    }
339
}
340
341
impl StoreLike for Store {
342
    #[inline]
343
9.98k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
344
9.98k
        self.inner.as_ref()
345
9.98k
    }
346
347
7
    fn as_pin(&self) -> Pin<&Self> {
348
7
        Pin::new(self)
349
7
    }
350
}
351
352
impl<T> StoreLike for T
353
where
354
    T: StoreDriver + Sized,
355
{
356
    #[inline]
357
7.90k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
358
7.90k
        self
359
7.90k
    }
360
361
54
    fn as_pin(&self) -> Pin<&Self> {
362
54
        Pin::new(self)
363
54
    }
364
}
365
366
pub trait StoreLike: Send + Sync + Sized + Unpin + 'static {
367
    /// Returns the immediate inner store driver.
368
    fn as_store_driver(&self) -> &'_ dyn StoreDriver;
369
370
    /// Utility function to return a pinned reference to self.
371
    fn as_pin(&self) -> Pin<&Self>;
372
373
    /// Utility function to return a pinned reference to the store driver.
374
    #[inline]
375
17.8k
    fn as_store_driver_pin(&self) -> Pin<&'_ dyn StoreDriver> {
376
17.8k
        Pin::new(self.as_store_driver())
377
17.8k
    }
378
379
    /// Look up a digest in the store and return None if it does not exist in
380
    /// the store, or Some(size) if it does.
381
    /// Note: On an AC store the size will be incorrect and should not be used!
382
    #[inline]
383
212
    fn has<'a>(
384
212
        &'a self,
385
212
        digest: impl Into<StoreKey<'a>>,
386
212
    ) -> impl Future<Output = Result<Option<u64>, Error>> + 'a {
387
212
        self.as_store_driver_pin().has(digest.into())
388
212
    }
389
390
    /// Look up a list of digests in the store and return a result for each in
391
    /// the same order as input.  The result will either be None if it does not
392
    /// exist in the store, or Some(size) if it does.
393
    /// Note: On an AC store the size will be incorrect and should not be used!
394
    #[inline]
395
19
    fn has_many<'a>(
396
19
        &'a self,
397
19
        digests: &'a [StoreKey<'a>],
398
19
    ) -> impl Future<Output = Result<Vec<Option<u64>>, Error>> + Send + 'a {
399
19
        self.as_store_driver_pin().has_many(digests)
400
19
    }
401
402
    /// The implementation of the above has and has_many functions.  See their
403
    /// documentation for details.
404
    #[inline]
405
41
    fn has_with_results<'a>(
406
41
        &'a self,
407
41
        digests: &'a [StoreKey<'a>],
408
41
        results: &'a mut [Option<u64>],
409
41
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
410
41
        self.as_store_driver_pin()
411
41
            .has_with_results(digests, results)
412
41
    }
413
414
    /// List all the keys in the store that are within the given range.
415
    /// `handler` is called for each key in the range. If `handler` returns
416
    /// false, the listing is stopped.
417
    ///
418
    /// The number of keys passed through the handler is the return value.
419
    #[inline]
420
7
    fn list<'a, 'b>(
421
7
        &'a self,
422
7
        range: impl RangeBounds<StoreKey<'b>> + Send + 'b,
423
7
        mut handler: impl for<'c> FnMut(&'c StoreKey) -> bool + Send + Sync + 'a,
424
7
    ) -> impl Future<Output = Result<u64, Error>> + Send + 'a
425
7
    where
426
7
        'b: 'a,
427
7
    {
428
        // Note: We use a manual async move, so the future can own the `range` and `handler`,
429
        // otherwise we'd require the caller to pass them in by reference making more borrow
430
        // checker noise.
431
7
        async move {
432
7
            self.as_store_driver_pin()
433
7
                .list(
434
7
                    (
435
7
                        range.start_bound().map(StoreKey::borrow),
436
7
                        range.end_bound().map(StoreKey::borrow),
437
7
                    ),
438
7
                    &mut handler,
439
7
                )
440
0
                .await
441
7
        }
442
7
    }
443
444
    /// Sends the data to the store.
445
    #[inline]
446
5.14k
    fn update<'a>(
447
5.14k
        &'a self,
448
5.14k
        digest: impl Into<StoreKey<'a>>,
449
5.14k
        reader: DropCloserReadHalf,
450
5.14k
        upload_size: UploadSizeInfo,
451
5.14k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
452
5.14k
        self.as_store_driver_pin()
453
5.14k
            .update(digest.into(), reader, upload_size)
454
5.14k
    }
455
456
    /// Any optimizations the store might want to expose to the callers.
457
    /// By default, no optimizations are exposed.
458
    #[inline]
459
6
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
460
6
        self.as_store_driver_pin().optimized_for(optimization)
461
6
    }
462
463
    /// Specialized version of `.update()` which takes a `ResumeableFileSlot`.
464
    /// This is useful if the underlying store can optimize the upload process
465
    /// when it knows the data is coming from a file.
466
    #[inline]
467
7
    fn update_with_whole_file<'a>(
468
7
        &'a self,
469
7
        digest: impl Into<StoreKey<'a>>,
470
7
        file: fs::ResumeableFileSlot,
471
7
        upload_size: UploadSizeInfo,
472
7
    ) -> impl Future<Output = Result<Option<fs::ResumeableFileSlot>, Error>> + Send + 'a {
473
7
        self.as_store_driver_pin()
474
7
            .update_with_whole_file(digest.into(), file, upload_size)
475
7
    }
476
477
    /// Utility to send all the data to the store when you have all the bytes.
478
    #[inline]
479
5.66k
    fn update_oneshot<'a>(
480
5.66k
        &'a self,
481
5.66k
        digest: impl Into<StoreKey<'a>>,
482
5.66k
        data: Bytes,
483
5.66k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
484
5.66k
        self.as_store_driver_pin()
485
5.66k
            .update_oneshot(digest.into(), data)
486
5.66k
    }
487
488
    /// Retrieves part of the data from the store and writes it to the given writer.
489
    #[inline]
490
1.27k
    fn get_part<'a>(
491
1.27k
        &'a self,
492
1.27k
        digest: impl Into<StoreKey<'a>>,
493
1.27k
        mut writer: impl BorrowMut<DropCloserWriteHalf> + Send + 'a,
494
1.27k
        offset: u64,
495
1.27k
        length: Option<u64>,
496
1.27k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
497
1.27k
        let key = digest.into();
498
        // Note: We need to capture `writer` just in case the caller
499
        // expects the drop() method to be called on it when the future
500
        // is done due to the complex interaction between the DropCloserWriteHalf
501
        // and the DropCloserReadHalf during drop().
502
1.27k
        async move {
503
1.27k
            self.as_store_driver_pin()
504
1.27k
                .get_part(key, writer.borrow_mut(), offset, length)
505
136
                .await
506
1.27k
        }
507
1.27k
    }
508
509
    /// Utility that works the same as `.get_part()`, but writes all the data.
510
    #[inline]
511
15
    fn get<'a>(
512
15
        &'a self,
513
15
        key: impl Into<StoreKey<'a>>,
514
15
        writer: DropCloserWriteHalf,
515
15
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
516
15
        self.as_store_driver_pin().get(key.into(), writer)
517
15
    }
518
519
    /// Utility that will return all the bytes at once instead of in a streaming manner.
520
    #[inline]
521
5.34k
    fn get_part_unchunked<'a>(
522
5.34k
        &'a self,
523
5.34k
        key: impl Into<StoreKey<'a>>,
524
5.34k
        offset: u64,
525
5.34k
        length: Option<u64>,
526
5.34k
    ) -> impl Future<Output = Result<Bytes, Error>> + Send + 'a {
527
5.34k
        self.as_store_driver_pin()
528
5.34k
            .get_part_unchunked(key.into(), offset, length)
529
5.34k
    }
530
531
    /// Default implementation of the health check. Some stores may want to override this
532
    /// in situations where the default implementation is not sufficient.
533
    #[inline]
534
0
    fn check_health(
535
0
        &self,
536
0
        namespace: Cow<'static, str>,
537
0
    ) -> impl Future<Output = HealthStatus> + Send {
538
0
        self.as_store_driver_pin().check_health(namespace)
539
0
    }
540
}
541
542
#[async_trait]
543
pub trait StoreDriver:
544
    Sync + Send + Unpin + MetricsComponent + HealthStatusIndicator + 'static
545
{
546
    /// See: [`StoreLike::has`] for details.
547
    #[inline]
548
220
    async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<u64>, Error> {
549
220
        let mut result = [None];
550
220
        self.has_with_results(&[key], &mut result).
await64
?0
;
551
220
        Ok(result[0])
552
440
    }
553
554
    /// See: [`StoreLike::has_many`] for details.
555
    #[inline]
556
    async fn has_many(
557
        self: Pin<&Self>,
558
        digests: &[StoreKey<'_>],
559
19
    ) -> Result<Vec<Option<u64>>, Error> {
560
19
        let mut results = vec![None; digests.len()];
561
19
        self.has_with_results(digests, &mut results).
await12
?0
;
562
19
        Ok(results)
563
38
    }
564
565
    /// See: [`StoreLike::has_with_results`] for details.
566
    async fn has_with_results(
567
        self: Pin<&Self>,
568
        digests: &[StoreKey<'_>],
569
        results: &mut [Option<u64>],
570
    ) -> Result<(), Error>;
571
572
    /// See: [`StoreLike::list`] for details.
573
    async fn list(
574
        self: Pin<&Self>,
575
        _range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
576
        _handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
577
0
    ) -> Result<u64, Error> {
578
        // TODO(allada) We should force all stores to implement this function instead of
579
        // providing a default implementation.
580
0
        Err(make_err!(
581
0
            Code::Unimplemented,
582
0
            "Store::list() not implemented for this store"
583
0
        ))
584
0
    }
585
586
    /// See: [`StoreLike::update`] for details.
587
    async fn update(
588
        self: Pin<&Self>,
589
        key: StoreKey<'_>,
590
        reader: DropCloserReadHalf,
591
        upload_size: UploadSizeInfo,
592
    ) -> Result<(), Error>;
593
594
    /// See: [`StoreLike::optimized_for`] for details.
595
46
    fn optimized_for(&self, _optimization: StoreOptimizations) -> bool {
596
46
        false
597
46
    }
598
599
    /// See: [`StoreLike::update_with_whole_file`] for details.
600
    async fn update_with_whole_file(
601
        self: Pin<&Self>,
602
        key: StoreKey<'_>,
603
        mut file: fs::ResumeableFileSlot,
604
        upload_size: UploadSizeInfo,
605
1
    ) -> Result<Option<fs::ResumeableFileSlot>, Error> {
606
1
        let inner_store = self.inner_store(Some(key.borrow()));
607
1
        if inner_store.optimized_for(StoreOptimizations::FileUpdates) {
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [Folded - Ignored]
  Branch (607:12): [True: 0, False: 1]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [Folded - Ignored]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
  Branch (607:12): [True: 0, False: 0]
608
0
            error_if!(
609
0
                addr_eq(inner_store, &*self),
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [Folded - Ignored]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [Folded - Ignored]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
  Branch (609:17): [True: 0, False: 0]
610
                "Store::inner_store() returned self when optimization present"
611
            );
612
0
            return Pin::new(inner_store)
613
0
                .update_with_whole_file(key, file, upload_size)
614
0
                .await;
615
1
        }
616
67
        slow_update_store_with_file(self, key, &mut file, upload_size).await
?0
;
617
1
        Ok(Some(file))
618
2
    }
619
620
    /// See: [`StoreLike::update_oneshot`] for details.
621
5.73k
    async fn update_oneshot(self: Pin<&Self>, key: StoreKey<'_>, data: Bytes) -> Result<(), Error> {
622
        // TODO(blaise.bruer) This is extremely inefficient, since we have exactly
623
        // what we need here. Maybe we could instead make a version of the stream
624
        // that can take objects already fully in memory instead?
625
5.73k
        let (mut tx, rx) = make_buf_channel_pair();
626
627
5.73k
        let data_len =
628
5.73k
            u64::try_from(data.len()).err_tip(|| 
"Could not convert data.len() to u64"0
)
?0
;
629
5.73k
        let send_fut = async move {
630
5.73k
            // Only send if we are not EOF.
631
5.73k
            if !data.is_empty() {
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 31, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [Folded - Ignored]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 1, False: 0]
  Branch (631:16): [True: 16, False: 40]
  Branch (631:16): [True: 8, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 4, False: 1]
  Branch (631:16): [True: 7, False: 0]
  Branch (631:16): [True: 104, False: 0]
  Branch (631:16): [True: 2, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 2, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 4, False: 0]
  Branch (631:16): [True: 1, False: 0]
  Branch (631:16): [True: 2, False: 0]
  Branch (631:16): [True: 2, False: 0]
  Branch (631:16): [True: 2, False: 0]
  Branch (631:16): [True: 2, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 1, False: 0]
  Branch (631:16): [True: 12, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 406, False: 1]
  Branch (631:16): [Folded - Ignored]
  Branch (631:16): [True: 3, False: 2]
  Branch (631:16): [True: 2, False: 0]
  Branch (631:16): [True: 1, False: 0]
  Branch (631:16): [True: 13, False: 1]
  Branch (631:16): [True: 25, False: 13]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 1, False: 0]
  Branch (631:16): [True: 5.00k, False: 0]
  Branch (631:16): [True: 6, False: 0]
  Branch (631:16): [True: 4, False: 0]
  Branch (631:16): [True: 2, False: 0]
  Branch (631:16): [True: 0, False: 0]
  Branch (631:16): [True: 7, False: 0]
632
5.67k
                tx.send(data)
633
38
                    .await
634
5.67k
                    .err_tip(|| 
"Failed to write data in update_oneshot"0
)
?0
;
635
58
            }
636
5.73k
            tx.send_eof()
637
5.73k
                .err_tip(|| 
"Failed to write EOF in update_oneshot"0
)
?0
;
638
5.73k
            Ok(())
639
5.73k
        };
640
5.73k
        try_join!(
641
5.73k
            send_fut,
642
5.73k
            self.update(key, rx, UploadSizeInfo::ExactSize(data_len))
643
5.73k
        )
?2
;
644
5.73k
        Ok(())
645
11.4k
    }
646
647
    /// See: [`StoreLike::get_part`] for details.
648
    async fn get_part(
649
        self: Pin<&Self>,
650
        key: StoreKey<'_>,
651
        writer: &mut DropCloserWriteHalf,
652
        offset: u64,
653
        length: Option<u64>,
654
    ) -> Result<(), Error>;
655
656
    /// See: [`StoreLike::get`] for details.
657
    #[inline]
658
    async fn get(
659
        self: Pin<&Self>,
660
        key: StoreKey<'_>,
661
        mut writer: DropCloserWriteHalf,
662
19
    ) -> Result<(), Error> {
663
1.08k
        
self.get_part(key, &mut writer, 0, None)19
.await
664
37
    }
665
666
    /// See: [`StoreLike::get_part_unchunked`] for details.
667
    async fn get_part_unchunked(
668
        self: Pin<&Self>,
669
        key: StoreKey<'_>,
670
        offset: u64,
671
        length: Option<u64>,
672
5.42k
    ) -> Result<Bytes, Error> {
673
5.42k
        let length_usize = length
674
5.42k
            .map(|v| 
usize::try_from(v).err_tip(2.21k
||
"Could not convert length to usize"0
)2.21k
)
675
5.42k
            .transpose()
?0
;
676
677
        // TODO(blaise.bruer) This is extremely inefficient, since we have exactly
678
        // what we need here. Maybe we could instead make a version of the stream
679
        // that can take objects already fully in memory instead?
680
5.42k
        let (mut tx, mut rx) = make_buf_channel_pair();
681
682
5.42k
        let (data_res, get_part_res) = join!(
683
5.42k
            rx.consume(length_usize),
684
5.42k
            // We use a closure here to ensure that the `tx` is dropped when the
685
5.42k
            // future is done.
686
5.42k
            async move { self.get_part(key, &mut tx, offset, length).
await4.08k
},
687
5.42k
        );
688
5.42k
        get_part_res
689
5.42k
            .err_tip(|| 
"Failed to get_part in get_part_unchunked"9
)
690
5.42k
            .merge(data_res.err_tip(|| 
"Failed to read stream to completion in get_part_unchunked"9
))
691
10.8k
    }
692
693
    /// See: [`StoreLike::check_health`] for details.
694
0
    async fn check_health(self: Pin<&Self>, namespace: Cow<'static, str>) -> HealthStatus {
695
0
        let digest_data_size = default_digest_size_health_check();
696
0
        let mut digest_data = vec![0u8; digest_data_size];
697
0
698
0
        let mut namespace_hasher = StdHasher::new();
699
0
        namespace.hash(&mut namespace_hasher);
700
0
        self.get_name().hash(&mut namespace_hasher);
701
0
        let hash_seed = namespace_hasher.finish();
702
0
703
0
        // Fill the digest data with random data based on a stable
704
0
        // hash of the namespace and store name. Intention is to
705
0
        // have randomly filled data that is unique per store and
706
0
        // does not change between health checks. This is to ensure
707
0
        // we are not adding more data to store on each health check.
708
0
        let mut rng: StdRng = StdRng::seed_from_u64(hash_seed);
709
0
        rng.fill_bytes(&mut digest_data);
710
0
711
0
        let mut digest_hasher = default_digest_hasher_func().hasher();
712
0
        digest_hasher.update(&digest_data);
713
0
        let digest_data_len = digest_data.len() as u64;
714
0
        let digest_info = StoreKey::from(digest_hasher.finalize_digest());
715
0
716
0
        let digest_bytes = bytes::Bytes::copy_from_slice(&digest_data);
717
718
0
        if let Err(e) = self
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [Folded - Ignored]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [Folded - Ignored]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
719
0
            .update_oneshot(digest_info.borrow(), digest_bytes.clone())
720
0
            .await
721
        {
722
0
            return HealthStatus::new_failed(
723
0
                self.get_ref(),
724
0
                format!("Store.update_oneshot() failed: {e}").into(),
725
0
            );
726
0
        }
727
0
728
0
        match self.has(digest_info.borrow()).await {
729
0
            Ok(Some(s)) => {
730
0
                if s != digest_data_len {
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [Folded - Ignored]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [Folded - Ignored]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
  Branch (730:20): [True: 0, False: 0]
731
0
                    return HealthStatus::new_failed(
732
0
                        self.get_ref(),
733
0
                        format!("Store.has() size mismatch {s} != {digest_data_len}").into(),
734
0
                    );
735
0
                }
736
            }
737
            Ok(None) => {
738
0
                return HealthStatus::new_failed(
739
0
                    self.get_ref(),
740
0
                    "Store.has() size not found".into(),
741
0
                );
742
            }
743
0
            Err(e) => {
744
0
                return HealthStatus::new_failed(
745
0
                    self.get_ref(),
746
0
                    format!("Store.has() failed: {e}").into(),
747
0
                );
748
            }
749
        }
750
751
0
        match self
752
0
            .get_part_unchunked(digest_info, 0, Some(digest_data_len))
753
0
            .await
754
        {
755
0
            Ok(b) => {
756
0
                if b != digest_bytes {
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [Folded - Ignored]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [Folded - Ignored]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
  Branch (756:20): [True: 0, False: 0]
757
0
                    return HealthStatus::new_failed(
758
0
                        self.get_ref(),
759
0
                        "Store.get_part_unchunked() data mismatch".into(),
760
0
                    );
761
0
                }
762
            }
763
0
            Err(e) => {
764
0
                return HealthStatus::new_failed(
765
0
                    self.get_ref(),
766
0
                    format!("Store.get_part_unchunked() failed: {e}").into(),
767
0
                );
768
            }
769
        }
770
771
0
        HealthStatus::new_ok(self.get_ref(), "Successfully store health check".into())
772
0
    }
773
774
    /// See: [`Store::inner_store`] for details.
775
    fn inner_store(&self, _digest: Option<StoreKey<'_>>) -> &dyn StoreDriver;
776
777
    /// Returns an Any variation of whatever Self is.
778
    fn as_any(&self) -> &(dyn std::any::Any + Sync + Send + 'static);
779
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static>;
780
781
    // Register health checks used to monitor the store.
782
0
    fn register_health(self: Arc<Self>, _registry: &mut HealthRegistryBuilder) {}
783
}
784
785
/// The instructions on how to decode a value from a Bytes & version into
786
/// the underlying type.
787
pub trait SchedulerStoreDecodeTo {
788
    type DecodeOutput;
789
    fn decode(version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error>;
790
}
791
792
pub trait SchedulerSubscription: Send + Sync {
793
    fn changed(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
794
}
795
796
pub trait SchedulerSubscriptionManager: Send + Sync {
797
    type Subscription: SchedulerSubscription;
798
799
    fn notify_for_test(&self, value: String);
800
801
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
802
    where
803
        K: SchedulerStoreKeyProvider;
804
}
805
806
/// The API surface for a scheduler store.
807
pub trait SchedulerStore: Send + Sync + 'static {
808
    type SubscriptionManager: SchedulerSubscriptionManager;
809
810
    /// Returns the subscription manager for the scheduler store.
811
    fn subscription_manager(&self) -> Result<Arc<Self::SubscriptionManager>, Error>;
812
813
    /// Updates or inserts an entry into the underlying store.
814
    /// Metadata about the key is attached to the compile-time type.
815
    /// If StoreKeyProvider::Versioned is TrueValue, the data will not
816
    /// be updated if the current version in the database does not match
817
    /// the version in the passed in data.
818
    /// No guarantees are made about when Version is FalseValue.
819
    /// Indexes are guaranteed to be updated atomically with the data.
820
    fn update_data<T>(&self, data: T) -> impl Future<Output = Result<Option<u64>, Error>> + Send
821
    where
822
        T: SchedulerStoreDataProvider
823
            + SchedulerStoreKeyProvider
824
            + SchedulerCurrentVersionProvider
825
            + Send;
826
827
    /// Searches for all keys in the store that match the given index prefix.
828
    fn search_by_index_prefix<K>(
829
        &self,
830
        index: K,
831
    ) -> impl Future<
832
        Output = Result<
833
            impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
834
            Error,
835
        >,
836
    > + Send
837
    where
838
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send;
839
840
    /// Returns data for the provided key with the given version if
841
    /// StoreKeyProvider::Versioned is TrueValue.
842
    fn get_and_decode<K>(
843
        &self,
844
        key: K,
845
    ) -> impl Future<Output = Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>> + Send
846
    where
847
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send;
848
}
849
850
/// A type that is used to let the scheduler store know what
851
/// index is being requested.
852
pub trait SchedulerIndexProvider {
853
    /// Only keys inserted with this prefix will be indexed.
854
    const KEY_PREFIX: &'static str;
855
856
    /// The name of the index.
857
    const INDEX_NAME: &'static str;
858
859
    /// The sort key for the index (if any).
860
    const MAYBE_SORT_KEY: Option<&'static str> = None;
861
862
    /// If the data is versioned.
863
    type Versioned: BoolValue;
864
865
    /// The value of the index.
866
    fn index_value(&self) -> Cow<'_, str>;
867
}
868
869
/// Provides a key to lookup data in the store.
870
pub trait SchedulerStoreKeyProvider {
871
    /// If the data is versioned.
872
    type Versioned: BoolValue;
873
874
    /// Returns the key for the data.
875
    fn get_key(&self) -> StoreKey<'static>;
876
}
877
878
/// Provides data to be stored in the scheduler store.
879
pub trait SchedulerStoreDataProvider {
880
    /// Converts the data into bytes to be stored in the store.
881
    fn try_into_bytes(self) -> Result<Bytes, Error>;
882
883
    /// Returns the indexes for the data if any.
884
1
    fn get_indexes(&self) -> Result<Vec<(&'static str, Bytes)>, Error> {
885
1
        Ok(Vec::new())
886
1
    }
887
}
888
889
/// Provides the current version of the data in the store.
890
pub trait SchedulerCurrentVersionProvider {
891
    /// Returns the current version of the data in the store.
892
    fn current_version(&self) -> u64;
893
}
894
895
/// Default implementation for when we are not providing a version
896
/// for the data.
897
impl<T> SchedulerCurrentVersionProvider for T
898
where
899
    T: SchedulerStoreKeyProvider<Versioned = FalseValue>,
900
{
901
0
    fn current_version(&self) -> u64 {
902
0
        0
903
0
    }
904
}
905
906
/// Compile time types for booleans.
907
pub trait BoolValue {
908
    const VALUE: bool;
909
}
910
/// Compile time check if something is false.
911
pub trait IsFalse {}
912
/// Compile time check if something is true.
913
pub trait IsTrue {}
914
915
/// Compile time true value.
916
pub struct TrueValue;
917
impl BoolValue for TrueValue {
918
    const VALUE: bool = true;
919
}
920
impl IsTrue for TrueValue {}
921
922
/// Compile time false value.
923
pub struct FalseValue;
924
impl BoolValue for FalseValue {
925
    const VALUE: bool = false;
926
}
927
impl IsFalse for FalseValue {}