Coverage Report

Created: 2026-03-31 21:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/store_trait.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::borrow::{Borrow, BorrowMut};
16
use core::convert::Into;
17
use core::fmt::{self, Debug, Display};
18
use core::future;
19
use core::hash::{Hash, Hasher};
20
use core::ops::{Bound, RangeBounds};
21
use core::pin::Pin;
22
use core::ptr::addr_eq;
23
use std::borrow::Cow;
24
use std::collections::hash_map::DefaultHasher as StdHasher;
25
use std::ffi::OsString;
26
use std::sync::{Arc, OnceLock};
27
28
use async_trait::async_trait;
29
use bytes::{Bytes, BytesMut};
30
use futures::{Future, FutureExt, Stream, join, try_join};
31
use nativelink_error::{Code, Error, ResultExt, error_if, make_err};
32
use nativelink_metric::MetricsComponent;
33
use rand::rngs::StdRng;
34
use rand::{RngCore, SeedableRng};
35
use serde::{Deserialize, Serialize};
36
use tokio::io::{AsyncReadExt, AsyncSeekExt};
37
use tracing::warn;
38
39
use crate::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair};
40
use crate::common::DigestInfo;
41
use crate::digest_hasher::{DigestHasher, DigestHasherFunc, default_digest_hasher_func};
42
use crate::fs;
43
use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
44
45
static DEFAULT_DIGEST_SIZE_HEALTH_CHECK: OnceLock<usize> = OnceLock::new();
46
/// Default digest size for health check data. Any change in this value
47
/// changes the default contract. `GlobalConfig` should be updated to reflect
48
/// changes in this value.
49
pub const DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG: usize = 1024 * 1024;
50
51
// Get the default digest size for health check data, if value is unset a system wide default is used.
52
1
pub fn default_digest_size_health_check() -> usize {
53
1
    *DEFAULT_DIGEST_SIZE_HEALTH_CHECK.get_or_init(|| DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG)
54
1
}
55
56
/// Set the default digest size for health check data, this should be called once.
57
0
pub fn set_default_digest_size_health_check(size: usize) -> Result<(), Error> {
58
0
    DEFAULT_DIGEST_SIZE_HEALTH_CHECK.set(size).map_err(|_| {
59
0
        make_err!(
60
0
            Code::Internal,
61
            "set_default_digest_size_health_check already set"
62
        )
63
0
    })
64
0
}
65
66
#[derive(Debug, PartialEq, Eq, Copy, Clone, Serialize, Deserialize)]
67
pub enum UploadSizeInfo {
68
    /// When the data transfer amount is known to be exact size, this enum should be used.
69
    /// The receiver store can use this to better optimize the way the data is sent or stored.
70
    ExactSize(u64),
71
72
    /// When the data transfer amount is not known to be exact, the caller should use this enum
73
    /// to provide the maximum size that could possibly be sent. This will bypass the exact size
74
    /// checks, but still provide useful information to the underlying store about the data being
75
    /// sent that it can then use to optimize the upload process.
76
    MaxSize(u64),
77
}
78
79
/// Utility to send all the data to the store from a file.
80
// Note: This is not inlined because some code may want to bypass any underlying
81
// optimizations that may be present in the inner store.
82
6
pub async fn slow_update_store_with_file<S: StoreDriver + ?Sized>(
83
6
    store: Pin<&S>,
84
6
    digest: impl Into<StoreKey<'_>>,
85
6
    file: &mut fs::FileSlot,
86
6
    upload_size: UploadSizeInfo,
87
6
) -> Result<(), Error> {
88
6
    file.rewind()
89
6
        .await
90
6
        .err_tip(|| "Failed to rewind in upload_file_to_store")
?0
;
91
6
    let (mut tx, rx) = make_buf_channel_pair();
92
93
6
    let update_fut = store
94
6
        .update(digest.into(), rx, upload_size)
95
6
        .map(|r| r.err_tip(|| "Could not upload data to store in upload_file_to_store"));
96
6
    let read_data_fut = async move {
97
        loop {
98
75
            let mut buf = BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE);
99
75
            let read = file
100
75
                .read_buf(&mut buf)
101
75
                .await
102
75
                .err_tip(|| "Failed to read in upload_file_to_store")
?0
;
103
75
            if read == 0 {
104
6
                break;
105
69
            }
106
69
            tx.send(buf.freeze())
107
69
                .await
108
69
                .err_tip(|| "Failed to send in upload_file_to_store")
?0
;
109
        }
110
6
        tx.send_eof()
111
6
            .err_tip(|| "Could not send EOF to store in upload_file_to_store")
112
6
    };
113
6
    tokio::pin!(read_data_fut);
114
6
    let (update_res, read_res) = tokio::join!(update_fut, read_data_fut);
115
6
    update_res.merge(read_res)
116
6
}
117
118
/// Optimizations that stores may want to expose to the callers.
119
/// This is useful for specific cases when the store can optimize the processing
120
/// of the data being processed.
121
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
122
pub enum StoreOptimizations {
123
    /// The store can optimize the upload process when it knows the data is coming from a file.
124
    FileUpdates,
125
126
    /// If the store will ignore the data uploads.
127
    NoopUpdates,
128
129
    /// If the store will never serve downloads.
130
    NoopDownloads,
131
132
    /// If the store will determine whether a key has associated data once a read has been
133
    /// attempted instead of calling `.has()` first.
134
    LazyExistenceOnSync,
135
136
    /// The store provides an optimized `update_oneshot` implementation that bypasses
137
    /// channel overhead for direct Bytes writes. Stores with this optimization can
138
    /// accept complete data directly without going through the MPSC channel.
139
    SubscribesToUpdateOneshot,
140
}
141
142
/// A wrapper struct for [`StoreKey`] to work around
143
/// lifetime limitations in `HashMap::get()` as described in
144
/// <https://github.com/rust-lang/rust/issues/80389>
145
///
146
/// As such this is a wrapper type that is stored in the
147
/// maps using the workaround as described in
148
/// <https://blinsay.com/blog/compound-keys/>
149
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
150
#[repr(transparent)]
151
pub struct StoreKeyBorrow(StoreKey<'static>);
152
153
impl From<StoreKey<'static>> for StoreKeyBorrow {
154
5.86k
    fn from(key: StoreKey<'static>) -> Self {
155
5.86k
        Self(key)
156
5.86k
    }
157
}
158
159
impl From<StoreKeyBorrow> for StoreKey<'static> {
160
0
    fn from(key_borrow: StoreKeyBorrow) -> Self {
161
0
        key_borrow.0
162
0
    }
163
}
164
165
impl<'a> Borrow<StoreKey<'a>> for StoreKeyBorrow {
166
4.77k
    fn borrow(&self) -> &StoreKey<'a> {
167
4.77k
        &self.0
168
4.77k
    }
169
}
170
171
impl<'a> Borrow<StoreKey<'a>> for &StoreKeyBorrow {
172
0
    fn borrow(&self) -> &StoreKey<'a> {
173
0
        &self.0
174
0
    }
175
}
176
177
/// Holds something that can be converted into a key the
178
/// store API can understand. Generally this is a digest
179
/// but it can also be a string if the caller wishes to
180
/// store the data directly and reference it by a string
181
/// directly.
182
#[derive(Debug, Eq)]
183
pub enum StoreKey<'a> {
184
    /// A string key.
185
    Str(Cow<'a, str>),
186
187
    /// A key that is a digest.
188
    Digest(DigestInfo),
189
}
190
191
impl<'a> StoreKey<'a> {
192
    /// Creates a new store key from a string.
193
26
    pub const fn new_str(s: &'a str) -> Self {
194
26
        StoreKey::Str(Cow::Borrowed(s))
195
26
    }
196
197
    /// Returns a shallow clone of the key.
198
    /// This is extremely cheap and should be used when clone
199
    /// is needed but the key is not going to be modified.
200
    #[must_use]
201
    #[allow(
202
        clippy::missing_const_for_fn,
203
        reason = "False positive on stable, but not on nightly"
204
    )]
205
3.25k
    pub fn borrow(&'a self) -> Self {
206
56
        match self {
207
15
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Borrowed(s)),
208
41
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Borrowed(s)),
209
3.20k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
210
        }
211
3.25k
    }
212
213
    /// Converts the key into an owned version. This is useful
214
    /// when the caller needs an owned version of the key.
215
12.0k
    pub fn into_owned(self) -> StoreKey<'static> {
216
70
        match self {
217
14
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Owned(s)),
218
56
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Owned(s.to_owned())),
219
12.0k
            StoreKey::Digest(d) => StoreKey::Digest(d),
220
        }
221
12.0k
    }
222
223
    /// Converts the key into a digest. This is useful when the caller
224
    /// must have a digest key. If the data is not a digest, it may
225
    /// hash the underlying key and return a digest of the hash of the key
226
117
    pub fn into_digest(self) -> DigestInfo {
227
117
        match self {
228
115
            StoreKey::Digest(digest) => digest,
229
2
            StoreKey::Str(s) => {
230
2
                let mut hasher = DigestHasherFunc::Blake3.hasher();
231
2
                hasher.update(s.as_bytes());
232
2
                hasher.finalize_digest()
233
            }
234
        }
235
117
    }
236
237
    /// Returns the key as a string. If the key is a digest, it will
238
    /// return a string representation of the digest. If the key is a string,
239
    /// it will return the string itself.
240
265
    pub fn as_str(&'a self) -> Cow<'a, str> {
241
130
        match self {
242
129
            StoreKey::Str(Cow::Owned(s)) => Cow::Borrowed(s),
243
1
            StoreKey::Str(Cow::Borrowed(s)) => Cow::Borrowed(s),
244
135
            StoreKey::Digest(d) => Cow::Owned(format!("{d}")),
245
        }
246
265
    }
247
}
248
249
impl Clone for StoreKey<'static> {
250
10.2k
    fn clone(&self) -> Self {
251
10.2k
        match self {
252
28
            StoreKey::Str(s) => StoreKey::Str(s.clone()),
253
10.2k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
254
        }
255
10.2k
    }
256
}
257
258
impl PartialOrd for StoreKey<'_> {
259
40
    fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
260
40
        Some(self.cmp(other))
261
40
    }
262
}
263
264
impl Ord for StoreKey<'_> {
265
57
    fn cmp(&self, other: &Self) -> core::cmp::Ordering {
266
57
        match (self, other) {
267
57
            (StoreKey::Str(a), StoreKey::Str(b)) => a.cmp(b),
268
0
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a.cmp(b),
269
0
            (StoreKey::Str(_), StoreKey::Digest(_)) => core::cmp::Ordering::Less,
270
0
            (StoreKey::Digest(_), StoreKey::Str(_)) => core::cmp::Ordering::Greater,
271
        }
272
57
    }
273
}
274
275
impl PartialEq for StoreKey<'_> {
276
5.67k
    fn eq(&self, other: &Self) -> bool {
277
5.67k
        match (self, other) {
278
51
            (StoreKey::Str(a), StoreKey::Str(b)) => a == b,
279
5.62k
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a == b,
280
0
            _ => false,
281
        }
282
5.67k
    }
283
}
284
285
impl Hash for StoreKey<'_> {
286
25.3k
    fn hash<H: Hasher>(&self, state: &mut H) {
287
        /// Salts the hash with the enum value that represents
288
        /// the type of the key.
289
        #[repr(u8)]
290
        enum HashId {
291
            Str = 0,
292
            Digest = 1,
293
        }
294
25.3k
        match self {
295
44
            StoreKey::Str(s) => {
296
44
                (HashId::Str as u8).hash(state);
297
44
                s.hash(state);
298
44
            }
299
25.3k
            StoreKey::Digest(d) => {
300
25.3k
                (HashId::Digest as u8).hash(state);
301
25.3k
                d.hash(state);
302
25.3k
            }
303
        }
304
25.3k
    }
305
}
306
307
impl<'a> From<&'a str> for StoreKey<'a> {
308
1
    fn from(s: &'a str) -> Self {
309
1
        StoreKey::Str(Cow::Borrowed(s))
310
1
    }
311
}
312
313
impl From<String> for StoreKey<'static> {
314
0
    fn from(s: String) -> Self {
315
0
        StoreKey::Str(Cow::Owned(s))
316
0
    }
317
}
318
319
impl From<DigestInfo> for StoreKey<'_> {
320
10.6k
    fn from(d: DigestInfo) -> Self {
321
10.6k
        StoreKey::Digest(d)
322
10.6k
    }
323
}
324
325
impl From<&DigestInfo> for StoreKey<'_> {
326
54
    fn from(d: &DigestInfo) -> Self {
327
54
        StoreKey::Digest(*d)
328
54
    }
329
}
330
331
// mostly for use with tracing::Value
332
impl Display for StoreKey<'_> {
333
31
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
334
31
        match self {
335
28
            StoreKey::Str(s) => {
336
28
                write!(f, "{s}")
337
            }
338
3
            StoreKey::Digest(d) => {
339
3
                write!(f, "Digest: {d}")
340
            }
341
        }
342
31
    }
343
}
344
345
#[derive(Clone, MetricsComponent)]
346
#[repr(transparent)]
347
pub struct Store {
348
    #[metric]
349
    inner: Arc<dyn StoreDriver>,
350
}
351
352
impl Debug for Store {
353
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
354
0
        f.debug_struct("Store").finish_non_exhaustive()
355
0
    }
356
}
357
358
impl Store {
359
329
    pub fn new(inner: Arc<dyn StoreDriver>) -> Self {
360
329
        Self { inner }
361
329
    }
362
363
    /// Returns the immediate inner store driver.
364
    /// Note: This does not recursively try to resolve underlying store drivers
365
    /// like `.inner_store()` does.
366
    #[inline]
367
1
    pub fn into_inner(self) -> Arc<dyn StoreDriver> {
368
1
        self.inner
369
1
    }
370
371
    /// Gets the underlying store for the given digest.
372
    /// A caller might want to use this to obtain a reference to the "real" underlying store
373
    /// (if applicable) and check if it implements some special traits that allow optimizations.
374
    /// Note: If the store performs complex operations on the data, it should return itself.
375
    #[inline]
376
236
    pub fn inner_store<'a, K: Into<StoreKey<'a>>>(&self, digest: Option<K>) -> &dyn StoreDriver {
377
236
        self.inner.inner_store(digest.map(Into::into))
378
236
    }
379
380
    /// Tries to cast the underlying store to the given type.
381
    #[inline]
382
68
    pub fn downcast_ref<U: StoreDriver>(&self, maybe_digest: Option<StoreKey<'_>>) -> Option<&U> {
383
68
        self.inner.inner_store(maybe_digest).as_any().downcast_ref()
384
68
    }
385
386
    /// Register health checks used to monitor the store.
387
    #[inline]
388
0
    pub fn register_health(&self, registry: &mut HealthRegistryBuilder) {
389
0
        self.inner.clone().register_health(registry);
390
0
    }
391
392
    #[inline]
393
8
    pub fn register_remove_callback(
394
8
        &self,
395
8
        callback: Arc<dyn RemoveItemCallback>,
396
8
    ) -> Result<(), Error> {
397
8
        self.inner.clone().register_remove_callback(callback)
398
8
    }
399
}
400
401
impl StoreLike for Store {
402
    #[inline]
403
10.1k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
404
10.1k
        self.inner.as_ref()
405
10.1k
    }
406
407
7
    fn as_pin(&self) -> Pin<&Self> {
408
7
        Pin::new(self)
409
7
    }
410
}
411
412
impl<T> StoreLike for T
413
where
414
    T: StoreDriver + Sized,
415
{
416
    #[inline]
417
8.12k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
418
8.12k
        self
419
8.12k
    }
420
421
94
    fn as_pin(&self) -> Pin<&Self> {
422
94
        Pin::new(self)
423
94
    }
424
}
425
426
pub trait StoreLike: Send + Sync + Sized + Unpin + 'static {
427
    /// Returns the immediate inner store driver.
428
    fn as_store_driver(&self) -> &'_ dyn StoreDriver;
429
430
    /// Utility function to return a pinned reference to self.
431
    fn as_pin(&self) -> Pin<&Self>;
432
433
    /// Utility function to return a pinned reference to the store driver.
434
    #[inline]
435
18.2k
    fn as_store_driver_pin(&self) -> Pin<&'_ dyn StoreDriver> {
436
18.2k
        Pin::new(self.as_store_driver())
437
18.2k
    }
438
439
    /// Look up a digest in the store and return None if it does not exist in
440
    /// the store, or Some(size) if it does.
441
    /// Note: On an AC store the size will be incorrect and should not be used!
442
    #[inline]
443
279
    fn has<'a>(
444
279
        &'a self,
445
279
        digest: impl Into<StoreKey<'a>>,
446
279
    ) -> impl Future<Output = Result<Option<u64>, Error>> + 'a {
447
279
        self.as_store_driver_pin().has(digest.into())
448
279
    }
449
450
    /// Look up a list of digests in the store and return a result for each in
451
    /// the same order as input.  The result will either be None if it does not
452
    /// exist in the store, or Some(size) if it does.
453
    /// Note: On an AC store the size will be incorrect and should not be used!
454
    #[inline]
455
20
    fn has_many<'a>(
456
20
        &'a self,
457
20
        digests: &'a [StoreKey<'a>],
458
20
    ) -> impl Future<Output = Result<Vec<Option<u64>>, Error>> + Send + 'a {
459
20
        if digests.is_empty() {
460
3
            return future::ready(Ok(vec![])).boxed();
461
17
        }
462
17
        self.as_store_driver_pin().has_many(digests)
463
20
    }
464
465
    /// The implementation of the above has and `has_many` functions.  See their
466
    /// documentation for details.
467
    #[inline]
468
59
    fn has_with_results<'a>(
469
59
        &'a self,
470
59
        digests: &'a [StoreKey<'a>],
471
59
        results: &'a mut [Option<u64>],
472
59
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
473
59
        if digests.is_empty() {
474
3
            return future::ready(Ok(())).boxed();
475
56
        }
476
56
        self.as_store_driver_pin()
477
56
            .has_with_results(digests, results)
478
59
    }
479
480
    /// List all the keys in the store that are within the given range.
481
    /// `handler` is called for each key in the range. If `handler` returns
482
    /// false, the listing is stopped.
483
    ///
484
    /// The number of keys passed through the handler is the return value.
485
    #[inline]
486
15
    fn list<'a, 'b>(
487
15
        &'a self,
488
15
        range: impl RangeBounds<StoreKey<'b>> + Send + 'b,
489
15
        mut handler: impl for<'c> FnMut(&'c StoreKey) -> bool + Send + Sync + 'a,
490
15
    ) -> impl Future<Output = Result<u64, Error>> + Send + 'a
491
15
    where
492
15
        'b: 'a,
493
    {
494
        // Note: We use a manual async move, so the future can own the `range` and `handler`,
495
        // otherwise we'd require the caller to pass them in by reference making more borrow
496
        // checker noise.
497
15
        async move {
498
15
            self.as_store_driver_pin()
499
15
                .list(
500
15
                    (
501
15
                        range.start_bound().map(StoreKey::borrow),
502
15
                        range.end_bound().map(StoreKey::borrow),
503
15
                    ),
504
15
                    &mut handler,
505
15
                )
506
15
                .await
507
15
        }
508
15
    }
509
510
    /// Sends the data to the store.
511
    #[inline]
512
5.25k
    fn update<'a>(
513
5.25k
        &'a self,
514
5.25k
        digest: impl Into<StoreKey<'a>>,
515
5.25k
        reader: DropCloserReadHalf,
516
5.25k
        upload_size: UploadSizeInfo,
517
5.25k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
518
5.25k
        self.as_store_driver_pin()
519
5.25k
            .update(digest.into(), reader, upload_size)
520
5.25k
    }
521
522
    /// Any optimizations the store might want to expose to the callers.
523
    /// By default, no optimizations are exposed.
524
    #[inline]
525
20
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
526
20
        self.as_store_driver_pin().optimized_for(optimization)
527
20
    }
528
529
    /// Specialized version of `.update()` which takes a `FileSlot`.
530
    /// This is useful if the underlying store can optimize the upload process
531
    /// when it knows the data is coming from a file.
532
    #[inline]
533
9
    fn update_with_whole_file<'a>(
534
9
        &'a self,
535
9
        digest: impl Into<StoreKey<'a>>,
536
9
        path: OsString,
537
9
        file: fs::FileSlot,
538
9
        upload_size: UploadSizeInfo,
539
9
    ) -> impl Future<Output = Result<Option<fs::FileSlot>, Error>> + Send + 'a {
540
9
        self.as_store_driver_pin()
541
9
            .update_with_whole_file(digest.into(), path, file, upload_size)
542
9
    }
543
544
    /// Utility to send all the data to the store when you have all the bytes.
545
    #[inline]
546
5.69k
    fn update_oneshot<'a>(
547
5.69k
        &'a self,
548
5.69k
        digest: impl Into<StoreKey<'a>>,
549
5.69k
        data: Bytes,
550
5.69k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
551
5.69k
        self.as_store_driver_pin()
552
5.69k
            .update_oneshot(digest.into(), data)
553
5.69k
    }
554
555
    /// Retrieves part of the data from the store and writes it to the given writer.
556
    #[inline]
557
1.34k
    fn get_part<'a>(
558
1.34k
        &'a self,
559
1.34k
        digest: impl Into<StoreKey<'a>>,
560
1.34k
        mut writer: impl BorrowMut<DropCloserWriteHalf> + Send + 'a,
561
1.34k
        offset: u64,
562
1.34k
        length: Option<u64>,
563
1.34k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
564
1.34k
        let key = digest.into();
565
        // Note: We need to capture `writer` just in case the caller
566
        // expects the drop() method to be called on it when the future
567
        // is done due to the complex interaction between the DropCloserWriteHalf
568
        // and the DropCloserReadHalf during drop().
569
1.34k
        async move {
570
1.34k
            self.as_store_driver_pin()
571
1.34k
                .get_part(key, writer.borrow_mut(), offset, length)
572
1.34k
                .await
573
1.34k
        }
574
1.34k
    }
575
576
    /// Utility that works the same as `.get_part()`, but writes all the data.
577
    #[inline]
578
18
    fn get<'a>(
579
18
        &'a self,
580
18
        key: impl Into<StoreKey<'a>>,
581
18
        writer: DropCloserWriteHalf,
582
18
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
583
18
        self.as_store_driver_pin().get(key.into(), writer)
584
18
    }
585
586
    /// Utility that will return all the bytes at once instead of in a streaming manner.
587
    #[inline]
588
5.28k
    fn get_part_unchunked<'a>(
589
5.28k
        &'a self,
590
5.28k
        key: impl Into<StoreKey<'a>>,
591
5.28k
        offset: u64,
592
5.28k
        length: Option<u64>,
593
5.28k
    ) -> impl Future<Output = Result<Bytes, Error>> + Send + 'a {
594
5.28k
        self.as_store_driver_pin()
595
5.28k
            .get_part_unchunked(key.into(), offset, length)
596
5.28k
    }
597
598
    /// Default implementation of the health check. Some stores may want to override this
599
    /// in situations where the default implementation is not sufficient.
600
    #[inline]
601
1
    fn check_health(
602
1
        &self,
603
1
        namespace: Cow<'static, str>,
604
1
    ) -> impl Future<Output = HealthStatus> + Send {
605
1
        self.as_store_driver_pin().check_health(namespace)
606
1
    }
607
}
608
609
#[async_trait]
610
pub trait StoreDriver:
611
    Sync + Send + Unpin + MetricsComponent + HealthStatusIndicator + 'static
612
{
613
    /// See: [`StoreLike::has`] for details.
614
    #[inline]
615
302
    async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<u64>, Error> {
616
        let mut result = [None];
617
        self.has_with_results(&[key], &mut result).await?;
618
        Ok(result[0])
619
302
    }
620
621
    /// See: [`StoreLike::has_many`] for details.
622
    #[inline]
623
    async fn has_many(
624
        self: Pin<&Self>,
625
        digests: &[StoreKey<'_>],
626
17
    ) -> Result<Vec<Option<u64>>, Error> {
627
        let mut results = vec![None; digests.len()];
628
        self.has_with_results(digests, &mut results).await?;
629
        Ok(results)
630
17
    }
631
632
    /// See: [`StoreLike::has_with_results`] for details.
633
    async fn has_with_results(
634
        self: Pin<&Self>,
635
        digests: &[StoreKey<'_>],
636
        results: &mut [Option<u64>],
637
    ) -> Result<(), Error>;
638
639
    /// See: [`StoreLike::list`] for details.
640
    async fn list(
641
        self: Pin<&Self>,
642
        _range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
643
        _handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
644
0
    ) -> Result<u64, Error> {
645
        // TODO(palfrey) We should force all stores to implement this function instead of
646
        // providing a default implementation.
647
        Err(make_err!(
648
            Code::Unimplemented,
649
            "Store::list() not implemented for this store"
650
        ))
651
0
    }
652
653
    /// See: [`StoreLike::update`] for details.
654
    async fn update(
655
        self: Pin<&Self>,
656
        key: StoreKey<'_>,
657
        reader: DropCloserReadHalf,
658
        upload_size: UploadSizeInfo,
659
    ) -> Result<(), Error>;
660
661
    /// See: [`StoreLike::optimized_for`] for details.
662
2
    fn optimized_for(&self, _optimization: StoreOptimizations) -> bool {
663
2
        false
664
2
    }
665
666
    /// See: [`StoreLike::update_with_whole_file`] for details.
667
    async fn update_with_whole_file(
668
        self: Pin<&Self>,
669
        key: StoreKey<'_>,
670
        path: OsString,
671
        mut file: fs::FileSlot,
672
        upload_size: UploadSizeInfo,
673
1
    ) -> Result<Option<fs::FileSlot>, Error> {
674
        let inner_store = self.inner_store(Some(key.borrow()));
675
        if inner_store.optimized_for(StoreOptimizations::FileUpdates) {
676
            error_if!(
677
                addr_eq(inner_store, &raw const *self),
678
                "Store::inner_store() returned self when optimization present"
679
            );
680
            return Pin::new(inner_store)
681
                .update_with_whole_file(key, path, file, upload_size)
682
                .await;
683
        }
684
        slow_update_store_with_file(self, key, &mut file, upload_size).await?;
685
        Ok(Some(file))
686
1
    }
687
688
    /// See: [`StoreLike::update_oneshot`] for details.
689
5.15k
    async fn update_oneshot(self: Pin<&Self>, key: StoreKey<'_>, data: Bytes) -> Result<(), Error> {
690
        // TODO(palfrey) This is extremely inefficient, since we have exactly
691
        // what we need here. Maybe we could instead make a version of the stream
692
        // that can take objects already fully in memory instead?
693
        let (mut tx, rx) = make_buf_channel_pair();
694
695
        let data_len =
696
            u64::try_from(data.len()).err_tip(|| "Could not convert data.len() to u64")?;
697
5.15k
        let send_fut = async move {
698
            // Only send if we are not EOF.
699
5.15k
            if !data.is_empty() {
700
5.12k
                tx.send(data)
701
5.12k
                    .await
702
5.12k
                    .err_tip(|| "Failed to write data in update_oneshot")
?0
;
703
37
            }
704
5.15k
            tx.send_eof()
705
5.15k
                .err_tip(|| "Failed to write EOF in update_oneshot")
?0
;
706
5.15k
            Ok(())
707
5.15k
        };
708
        try_join!(
709
            send_fut,
710
            self.update(key, rx, UploadSizeInfo::ExactSize(data_len))
711
        )?;
712
        Ok(())
713
5.15k
    }
714
715
    /// See: [`StoreLike::get_part`] for details.
716
    async fn get_part(
717
        self: Pin<&Self>,
718
        key: StoreKey<'_>,
719
        writer: &mut DropCloserWriteHalf,
720
        offset: u64,
721
        length: Option<u64>,
722
    ) -> Result<(), Error>;
723
724
    /// See: [`StoreLike::get`] for details.
725
    #[inline]
726
    async fn get(
727
        self: Pin<&Self>,
728
        key: StoreKey<'_>,
729
        mut writer: DropCloserWriteHalf,
730
18
    ) -> Result<(), Error> {
731
        self.get_part(key, &mut writer, 0, None).await
732
18
    }
733
734
    /// See: [`StoreLike::get_part_unchunked`] for details.
735
    async fn get_part_unchunked(
736
        self: Pin<&Self>,
737
        key: StoreKey<'_>,
738
        offset: u64,
739
        length: Option<u64>,
740
5.40k
    ) -> Result<Bytes, Error> {
741
        let length_usize = length
742
2.27k
            .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize"))
743
            .transpose()?;
744
745
        // TODO(palfrey) This is extremely inefficient, since we have exactly
746
        // what we need here. Maybe we could instead make a version of the stream
747
        // that can take objects already fully in memory instead?
748
        let (mut tx, mut rx) = make_buf_channel_pair();
749
750
        let (data_res, get_part_res) = join!(
751
            rx.consume(length_usize),
752
            // We use a closure here to ensure that the `tx` is dropped when the
753
            // future is done.
754
5.40k
            async move { self.get_part(key, &mut tx, offset, length).await },
755
        );
756
        get_part_res
757
            .err_tip(|| "Failed to get_part in get_part_unchunked")
758
            .merge(data_res.err_tip(|| "Failed to read stream to completion in get_part_unchunked"))
759
5.40k
    }
760
761
    /// See: [`StoreLike::check_health`] for details.
762
1
    async fn check_health(self: Pin<&Self>, namespace: Cow<'static, str>) -> HealthStatus {
763
        let digest_data_size = default_digest_size_health_check();
764
        let mut digest_data = vec![0u8; digest_data_size];
765
766
        let mut namespace_hasher = StdHasher::new();
767
        namespace.hash(&mut namespace_hasher);
768
        self.get_name().hash(&mut namespace_hasher);
769
        let hash_seed = namespace_hasher.finish();
770
771
        // Fill the digest data with random data based on a stable
772
        // hash of the namespace and store name. Intention is to
773
        // have randomly filled data that is unique per store and
774
        // does not change between health checks. This is to ensure
775
        // we are not adding more data to store on each health check.
776
        let mut rng: StdRng = StdRng::seed_from_u64(hash_seed);
777
        rng.fill_bytes(&mut digest_data);
778
779
        let mut digest_hasher = default_digest_hasher_func().hasher();
780
        digest_hasher.update(&digest_data);
781
        let digest_data_len = digest_data.len() as u64;
782
        let digest_info = StoreKey::from(digest_hasher.finalize_digest());
783
784
        let digest_bytes = Bytes::copy_from_slice(&digest_data);
785
786
        if let Err(e) = self
787
            .update_oneshot(digest_info.borrow(), digest_bytes.clone())
788
            .await
789
        {
790
            warn!(?e, "check_health Store.update_oneshot() failed");
791
            return HealthStatus::new_failed(
792
                self.get_ref(),
793
                format!("Store.update_oneshot() failed: {e}").into(),
794
            );
795
        }
796
797
        match self.has(digest_info.borrow()).await {
798
            Ok(Some(s)) => {
799
                if s != digest_data_len {
800
                    return HealthStatus::new_failed(
801
                        self.get_ref(),
802
                        format!("Store.has() size mismatch {s} != {digest_data_len}").into(),
803
                    );
804
                }
805
            }
806
            Ok(None) => {
807
                return HealthStatus::new_failed(
808
                    self.get_ref(),
809
                    "Store.has() size not found".into(),
810
                );
811
            }
812
            Err(e) => {
813
                return HealthStatus::new_failed(
814
                    self.get_ref(),
815
                    format!("Store.has() failed: {e}").into(),
816
                );
817
            }
818
        }
819
820
        match self
821
            .get_part_unchunked(digest_info, 0, Some(digest_data_len))
822
            .await
823
        {
824
            Ok(b) => {
825
                if b != digest_bytes {
826
                    return HealthStatus::new_failed(
827
                        self.get_ref(),
828
                        "Store.get_part_unchunked() data mismatch".into(),
829
                    );
830
                }
831
            }
832
            Err(e) => {
833
                return HealthStatus::new_failed(
834
                    self.get_ref(),
835
                    format!("Store.get_part_unchunked() failed: {e}").into(),
836
                );
837
            }
838
        }
839
840
        HealthStatus::new_ok(self.get_ref(), "Successfully store health check".into())
841
1
    }
842
843
    /// See: [`Store::inner_store`] for details.
844
    fn inner_store(&self, _digest: Option<StoreKey<'_>>) -> &dyn StoreDriver;
845
846
    /// Returns an Any variation of whatever Self is.
847
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static);
848
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static>;
849
850
    // Register health checks used to monitor the store.
851
0
    fn register_health(self: Arc<Self>, _registry: &mut HealthRegistryBuilder) {}
852
853
    fn register_remove_callback(
854
        self: Arc<Self>,
855
        callback: Arc<dyn RemoveItemCallback>,
856
    ) -> Result<(), Error>;
857
}
858
859
// Callback to be called when a store deletes an item. This is used so
860
// compound stores can remove items from their internal state when their
861
// underlying stores remove items e.g. caches
862
pub trait RemoveItemCallback: Debug + Send + Sync {
863
    fn callback<'a>(
864
        &'a self,
865
        store_key: StoreKey<'a>,
866
    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
867
}
868
869
/// The instructions on how to decode a value from a Bytes & version into
870
/// the underlying type.
871
pub trait SchedulerStoreDecodeTo {
872
    type DecodeOutput;
873
    fn decode(version: i64, data: Bytes) -> Result<Self::DecodeOutput, Error>;
874
}
875
876
pub trait SchedulerSubscription: Send + Sync {
877
    fn changed(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
878
}
879
880
pub trait SchedulerSubscriptionManager: Send + Sync {
881
    type Subscription: SchedulerSubscription;
882
883
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
884
    where
885
        K: SchedulerStoreKeyProvider;
886
887
    fn is_reliable() -> bool;
888
}
889
890
/// The API surface for a scheduler store.
891
pub trait SchedulerStore: Send + Sync + 'static {
892
    type SubscriptionManager: SchedulerSubscriptionManager;
893
894
    /// Returns the subscription manager for the scheduler store.
895
    fn subscription_manager(
896
        &self,
897
    ) -> impl Future<Output = Result<Arc<Self::SubscriptionManager>, Error>> + Send;
898
899
    /// Updates or inserts an entry into the underlying store.
900
    /// Metadata about the key is attached to the compile-time type.
901
    /// If `StoreKeyProvider::Versioned` is `TrueValue`, the data will not
902
    /// be updated if the current version in the database does not match
903
    /// the version in the passed in data.
904
    /// No guarantees are made about when `Version` is `FalseValue`.
905
    /// Indexes are guaranteed to be updated atomically with the data.
906
    fn update_data<T>(&self, data: T) -> impl Future<Output = Result<Option<i64>, Error>> + Send
907
    where
908
        T: SchedulerStoreDataProvider
909
            + SchedulerStoreKeyProvider
910
            + SchedulerCurrentVersionProvider
911
            + Send;
912
913
    /// Searches for all keys in the store that match the given index prefix.
914
    fn search_by_index_prefix<K>(
915
        &self,
916
        index: K,
917
    ) -> impl Future<
918
        Output = Result<
919
            impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
920
            Error,
921
        >,
922
    > + Send
923
    where
924
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send;
925
926
    /// Returns data for the provided key with the given version if
927
    /// `StoreKeyProvider::Versioned` is `TrueValue`.
928
    fn get_and_decode<K>(
929
        &self,
930
        key: K,
931
    ) -> impl Future<Output = Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>> + Send
932
    where
933
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send;
934
}
935
936
/// A type that is used to let the scheduler store know what
937
/// index is being requested.
938
pub trait SchedulerIndexProvider {
939
    /// Only keys inserted with this prefix will be indexed.
940
    const KEY_PREFIX: &'static str;
941
942
    /// The name of the index.
943
    const INDEX_NAME: &'static str;
944
945
    /// The sort key for the index (if any).
946
    const MAYBE_SORT_KEY: Option<&'static str> = None;
947
948
    /// If the data is versioned.
949
    type Versioned: BoolValue;
950
951
    /// The value of the index.
952
    fn index_value(&self) -> Cow<'_, str>;
953
}
954
955
/// Provides a key to lookup data in the store.
956
pub trait SchedulerStoreKeyProvider {
957
    /// If the data is versioned.
958
    type Versioned: BoolValue;
959
960
    /// Returns the key for the data.
961
    fn get_key(&self) -> StoreKey<'static>;
962
}
963
964
/// Provides data to be stored in the scheduler store.
965
pub trait SchedulerStoreDataProvider {
966
    /// Converts the data into bytes to be stored in the store.
967
    fn try_into_bytes(self) -> Result<Bytes, Error>;
968
969
    /// Returns the indexes for the data if any.
970
3
    fn get_indexes(&self) -> Result<Vec<(&'static str, Bytes)>, Error> {
971
3
        Ok(Vec::new())
972
3
    }
973
}
974
975
/// Provides the current version of the data in the store.
976
pub trait SchedulerCurrentVersionProvider {
977
    /// Returns the current version of the data in the store.
978
    fn current_version(&self) -> i64;
979
}
980
981
/// Default implementation for when we are not providing a version
982
/// for the data.
983
impl<T> SchedulerCurrentVersionProvider for T
984
where
985
    T: SchedulerStoreKeyProvider<Versioned = FalseValue>,
986
{
987
0
    fn current_version(&self) -> i64 {
988
0
        0
989
0
    }
990
}
991
992
/// Compile time types for booleans.
993
pub trait BoolValue {
994
    const VALUE: bool;
995
}
996
/// Compile time check if something is false.
997
pub trait IsFalse {}
998
/// Compile time check if something is true.
999
pub trait IsTrue {}
1000
1001
/// Compile time true value.
1002
#[derive(Debug, Clone, Copy)]
1003
pub struct TrueValue;
1004
impl BoolValue for TrueValue {
1005
    const VALUE: bool = true;
1006
}
1007
impl IsTrue for TrueValue {}
1008
1009
/// Compile time false value.
1010
#[derive(Debug, Clone, Copy)]
1011
pub struct FalseValue;
1012
impl BoolValue for FalseValue {
1013
    const VALUE: bool = false;
1014
}
1015
impl IsFalse for FalseValue {}