Coverage Report

Created: 2026-05-23 21:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/store_trait.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::borrow::{Borrow, BorrowMut};
16
use core::convert::Into;
17
use core::fmt::{self, Debug, Display};
18
use core::future;
19
use core::hash::{Hash, Hasher};
20
use core::ops::{Bound, RangeBounds};
21
use core::pin::Pin;
22
use core::ptr::addr_eq;
23
use core::time::Duration;
24
use std::borrow::Cow;
25
use std::collections::hash_map::DefaultHasher as StdHasher;
26
use std::ffi::OsString;
27
use std::sync::{Arc, OnceLock};
28
29
use async_trait::async_trait;
30
use bytes::{Bytes, BytesMut};
31
use futures::{Future, FutureExt, Stream, join, try_join};
32
use nativelink_error::{Code, Error, ResultExt, error_if, make_err};
33
use nativelink_metric::MetricsComponent;
34
use rand::rngs::StdRng;
35
use rand::{RngCore, SeedableRng};
36
use serde::{Deserialize, Serialize};
37
use tokio::io::{AsyncReadExt, AsyncSeekExt};
38
use tracing::warn;
39
40
use crate::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair};
41
use crate::common::DigestInfo;
42
use crate::digest_hasher::{DigestHasher, DigestHasherFunc, default_digest_hasher_func};
43
use crate::fs;
44
use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
45
46
static DEFAULT_DIGEST_SIZE_HEALTH_CHECK: OnceLock<usize> = OnceLock::new();
47
/// Default digest size for health check data. Any change in this value
48
/// changes the default contract. `GlobalConfig` should be updated to reflect
49
/// changes in this value.
50
pub const DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG: usize = 1024 * 1024;
51
52
// Get the default digest size for health check data, if value is unset a system wide default is used.
53
1
pub fn default_digest_size_health_check() -> usize {
54
1
    *DEFAULT_DIGEST_SIZE_HEALTH_CHECK.get_or_init(|| DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG)
55
1
}
56
57
/// Set the default digest size for health check data, this should be called once.
58
0
pub fn set_default_digest_size_health_check(size: usize) -> Result<(), Error> {
59
0
    DEFAULT_DIGEST_SIZE_HEALTH_CHECK.set(size).map_err(|_| {
60
0
        make_err!(
61
0
            Code::Internal,
62
            "set_default_digest_size_health_check already set"
63
        )
64
0
    })
65
0
}
66
67
#[derive(
68
    Debug,
69
    PartialEq,
70
    Eq,
71
    Copy,
72
    Clone,
73
    Serialize,
74
    Deserialize,
75
0
    wincode::SchemaWrite,
76
0
    wincode::SchemaRead,
77
)]
78
pub enum UploadSizeInfo {
79
    /// When the data transfer amount is known to be exact size, this enum should be used.
80
    /// The receiver store can use this to better optimize the way the data is sent or stored.
81
    ExactSize(u64),
82
83
    /// When the data transfer amount is not known to be exact, the caller should use this enum
84
    /// to provide the maximum size that could possibly be sent. This will bypass the exact size
85
    /// checks, but still provide useful information to the underlying store about the data being
86
    /// sent that it can then use to optimize the upload process.
87
    MaxSize(u64),
88
}
89
90
/// Utility to send all the data to the store from a file.
91
// Note: This is not inlined because some code may want to bypass any underlying
92
// optimizations that may be present in the inner store.
93
11
pub async fn slow_update_store_with_file<S: StoreDriver + ?Sized>(
94
11
    store: Pin<&S>,
95
11
    digest: impl Into<StoreKey<'_>>,
96
11
    file: &mut fs::FileSlot,
97
11
    upload_size: UploadSizeInfo,
98
11
) -> Result<(), Error> {
99
11
    file.rewind()
100
11
        .await
101
11
        .err_tip(|| "Failed to rewind in upload_file_to_store")
?0
;
102
11
    let (mut tx, rx) = make_buf_channel_pair();
103
104
11
    let update_fut = store
105
11
        .update(digest.into(), rx, upload_size)
106
11
        .map(|r| r.err_tip(|| "Could not upload data to store in upload_file_to_store"));
107
11
    let read_data_fut = async move {
108
        loop {
109
85
            let mut buf = BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE);
110
85
            let read = file
111
85
                .read_buf(&mut buf)
112
85
                .await
113
85
                .err_tip(|| "Failed to read in upload_file_to_store")
?0
;
114
85
            if read == 0 {
115
11
                break;
116
74
            }
117
74
            tx.send(buf.freeze())
118
74
                .await
119
74
                .err_tip(|| "Failed to send in upload_file_to_store")
?0
;
120
        }
121
11
        tx.send_eof()
122
11
            .err_tip(|| "Could not send EOF to store in upload_file_to_store")
123
11
    };
124
11
    tokio::pin!(read_data_fut);
125
11
    let (update_res, read_res) = tokio::join!(update_fut, read_data_fut);
126
11
    update_res.merge(read_res)
127
11
}
128
129
/// Optimizations that stores may want to expose to the callers.
130
/// This is useful for specific cases when the store can optimize the processing
131
/// of the data being processed.
132
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
133
pub enum StoreOptimizations {
134
    /// The store can optimize the upload process when it knows the data is coming from a file.
135
    FileUpdates,
136
137
    /// If the store will ignore the data uploads.
138
    NoopUpdates,
139
140
    /// If the store will never serve downloads.
141
    NoopDownloads,
142
143
    /// If the store will determine whether a key has associated data once a read has been
144
    /// attempted instead of calling `.has()` first.
145
    LazyExistenceOnSync,
146
147
    /// The store provides an optimized `update_oneshot` implementation that bypasses
148
    /// channel overhead for direct Bytes writes. Stores with this optimization can
149
    /// accept complete data directly without going through the MPSC channel.
150
    SubscribesToUpdateOneshot,
151
}
152
153
/// A wrapper struct for [`StoreKey`] to work around
154
/// lifetime limitations in `HashMap::get()` as described in
155
/// <https://github.com/rust-lang/rust/issues/80389>
156
///
157
/// As such this is a wrapper type that is stored in the
158
/// maps using the workaround as described in
159
/// <https://blinsay.com/blog/compound-keys/>
160
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
161
#[repr(transparent)]
162
pub struct StoreKeyBorrow(StoreKey<'static>);
163
164
impl From<StoreKey<'static>> for StoreKeyBorrow {
165
5.94k
    fn from(key: StoreKey<'static>) -> Self {
166
5.94k
        Self(key)
167
5.94k
    }
168
}
169
170
impl From<StoreKeyBorrow> for StoreKey<'static> {
171
0
    fn from(key_borrow: StoreKeyBorrow) -> Self {
172
0
        key_borrow.0
173
0
    }
174
}
175
176
impl<'a> Borrow<StoreKey<'a>> for StoreKeyBorrow {
177
4.91k
    fn borrow(&self) -> &StoreKey<'a> {
178
4.91k
        &self.0
179
4.91k
    }
180
}
181
182
impl<'a> Borrow<StoreKey<'a>> for &StoreKeyBorrow {
183
0
    fn borrow(&self) -> &StoreKey<'a> {
184
0
        &self.0
185
0
    }
186
}
187
188
/// Holds something that can be converted into a key the
189
/// store API can understand. Generally this is a digest
190
/// but it can also be a string if the caller wishes to
191
/// store the data directly and reference it by a string
192
/// directly.
193
#[derive(Debug, Eq)]
194
pub enum StoreKey<'a> {
195
    /// A string key.
196
    Str(Cow<'a, str>),
197
198
    /// A key that is a digest.
199
    Digest(DigestInfo),
200
}
201
202
impl<'a> StoreKey<'a> {
203
    /// Creates a new store key from a string.
204
26
    pub const fn new_str(s: &'a str) -> Self {
205
26
        StoreKey::Str(Cow::Borrowed(s))
206
26
    }
207
208
    /// Returns a shallow clone of the key.
209
    /// This is extremely cheap and should be used when clone
210
    /// is needed but the key is not going to be modified.
211
    #[must_use]
212
    #[allow(
213
        clippy::missing_const_for_fn,
214
        reason = "False positive on stable, but not on nightly"
215
    )]
216
4.17k
    pub fn borrow(&'a self) -> Self {
217
56
        match self {
218
15
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Borrowed(s)),
219
41
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Borrowed(s)),
220
4.11k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
221
        }
222
4.17k
    }
223
224
    /// Converts the key into an owned version. This is useful
225
    /// when the caller needs an owned version of the key.
226
12.6k
    pub fn into_owned(self) -> StoreKey<'static> {
227
70
        match self {
228
14
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Owned(s)),
229
56
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Owned(s.to_owned())),
230
12.5k
            StoreKey::Digest(d) => StoreKey::Digest(d),
231
        }
232
12.6k
    }
233
234
    /// Converts the key into a digest. This is useful when the caller
235
    /// must have a digest key. If the data is not a digest, it may
236
    /// hash the underlying key and return a digest of the hash of the key
237
142
    pub fn into_digest(self) -> DigestInfo {
238
142
        match self {
239
140
            StoreKey::Digest(digest) => digest,
240
2
            StoreKey::Str(s) => {
241
2
                let mut hasher = DigestHasherFunc::Blake3.hasher();
242
2
                hasher.update(s.as_bytes());
243
2
                hasher.finalize_digest()
244
            }
245
        }
246
142
    }
247
248
    /// Returns the key as a string. If the key is a digest, it will
249
    /// return a string representation of the digest. If the key is a string,
250
    /// it will return the string itself.
251
674
    pub fn as_str(&'a self) -> Cow<'a, str> {
252
539
        match self {
253
538
            StoreKey::Str(Cow::Owned(s)) => Cow::Borrowed(s),
254
1
            StoreKey::Str(Cow::Borrowed(s)) => Cow::Borrowed(s),
255
135
            StoreKey::Digest(d) => Cow::Owned(format!("{d}")),
256
        }
257
674
    }
258
}
259
260
impl Clone for StoreKey<'static> {
261
10.4k
    fn clone(&self) -> Self {
262
10.4k
        match self {
263
28
            StoreKey::Str(s) => StoreKey::Str(s.clone()),
264
10.3k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
265
        }
266
10.4k
    }
267
}
268
269
impl PartialOrd for StoreKey<'_> {
270
40
    fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
271
40
        Some(self.cmp(other))
272
40
    }
273
}
274
275
impl Ord for StoreKey<'_> {
276
57
    fn cmp(&self, other: &Self) -> core::cmp::Ordering {
277
57
        match (self, other) {
278
57
            (StoreKey::Str(a), StoreKey::Str(b)) => a.cmp(b),
279
0
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a.cmp(b),
280
0
            (StoreKey::Str(_), StoreKey::Digest(_)) => core::cmp::Ordering::Less,
281
0
            (StoreKey::Digest(_), StoreKey::Str(_)) => core::cmp::Ordering::Greater,
282
        }
283
57
    }
284
}
285
286
impl PartialEq for StoreKey<'_> {
287
5.91k
    fn eq(&self, other: &Self) -> bool {
288
5.91k
        match (self, other) {
289
51
            (StoreKey::Str(a), StoreKey::Str(b)) => a == b,
290
5.86k
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a == b,
291
0
            _ => false,
292
        }
293
5.91k
    }
294
}
295
296
impl Hash for StoreKey<'_> {
297
26.0k
    fn hash<H: Hasher>(&self, state: &mut H) {
298
        /// Salts the hash with the enum value that represents
299
        /// the type of the key.
300
        #[repr(u8)]
301
        enum HashId {
302
            Str = 0,
303
            Digest = 1,
304
        }
305
26.0k
        match self {
306
44
            StoreKey::Str(s) => {
307
44
                (HashId::Str as u8).hash(state);
308
44
                s.hash(state);
309
44
            }
310
25.9k
            StoreKey::Digest(d) => {
311
25.9k
                (HashId::Digest as u8).hash(state);
312
25.9k
                d.hash(state);
313
25.9k
            }
314
        }
315
26.0k
    }
316
}
317
318
impl<'a> From<&'a str> for StoreKey<'a> {
319
0
    fn from(s: &'a str) -> Self {
320
0
        StoreKey::Str(Cow::Borrowed(s))
321
0
    }
322
}
323
324
impl From<String> for StoreKey<'static> {
325
0
    fn from(s: String) -> Self {
326
0
        StoreKey::Str(Cow::Owned(s))
327
0
    }
328
}
329
330
impl From<DigestInfo> for StoreKey<'_> {
331
10.7k
    fn from(d: DigestInfo) -> Self {
332
10.7k
        StoreKey::Digest(d)
333
10.7k
    }
334
}
335
336
impl From<&DigestInfo> for StoreKey<'_> {
337
71
    fn from(d: &DigestInfo) -> Self {
338
71
        StoreKey::Digest(*d)
339
71
    }
340
}
341
342
// mostly for use with tracing::Value
343
impl Display for StoreKey<'_> {
344
32
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
345
32
        match self {
346
29
            StoreKey::Str(s) => {
347
29
                write!(f, "{s}")
348
            }
349
3
            StoreKey::Digest(d) => {
350
3
                write!(f, "Digest: {d}")
351
            }
352
        }
353
32
    }
354
}
355
356
#[derive(Clone, MetricsComponent)]
357
#[repr(transparent)]
358
pub struct Store {
359
    #[metric]
360
    inner: Arc<dyn StoreDriver>,
361
}
362
363
impl Debug for Store {
364
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
365
0
        f.debug_struct("Store").finish_non_exhaustive()
366
0
    }
367
}
368
369
impl Store {
370
378
    pub fn new(inner: Arc<dyn StoreDriver>) -> Self {
371
378
        Self { inner }
372
378
    }
373
374
    /// Returns the immediate inner store driver.
375
    /// Note: This does not recursively try to resolve underlying store drivers
376
    /// like `.inner_store()` does.
377
    #[inline]
378
1
    pub fn into_inner(self) -> Arc<dyn StoreDriver> {
379
1
        self.inner
380
1
    }
381
382
    /// Gets the underlying store for the given digest.
383
    /// A caller might want to use this to obtain a reference to the "real" underlying store
384
    /// (if applicable) and check if it implements some special traits that allow optimizations.
385
    /// Note: If the store performs complex operations on the data, it should return itself.
386
    #[inline]
387
341
    pub fn inner_store<'a, K: Into<StoreKey<'a>>>(&self, digest: Option<K>) -> &dyn StoreDriver {
388
341
        self.inner.inner_store(digest.map(Into::into))
389
341
    }
390
391
    /// Tries to cast the underlying store to the given type.
392
    #[inline]
393
75
    pub fn downcast_ref<U: StoreDriver>(&self, maybe_digest: Option<StoreKey<'_>>) -> Option<&U> {
394
75
        self.inner.inner_store(maybe_digest).as_any().downcast_ref()
395
75
    }
396
397
    /// Register health checks used to monitor the store.
398
    #[inline]
399
0
    pub fn register_health(&self, registry: &mut HealthRegistryBuilder) {
400
0
        self.inner.clone().register_health(registry);
401
0
    }
402
403
    #[inline]
404
8
    pub fn register_remove_callback(
405
8
        &self,
406
8
        callback: Arc<dyn RemoveItemCallback>,
407
8
    ) -> Result<(), Error> {
408
8
        self.inner.clone().register_remove_callback(callback)
409
8
    }
410
}
411
412
impl StoreLike for Store {
413
    #[inline]
414
10.3k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
415
10.3k
        self.inner.as_ref()
416
10.3k
    }
417
418
12
    fn as_pin(&self) -> Pin<&Self> {
419
12
        Pin::new(self)
420
12
    }
421
}
422
423
impl<T> StoreLike for T
424
where
425
    T: StoreDriver + Sized,
426
{
427
    #[inline]
428
8.20k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
429
8.20k
        self
430
8.20k
    }
431
432
117
    fn as_pin(&self) -> Pin<&Self> {
433
117
        Pin::new(self)
434
117
    }
435
}
436
437
pub trait StoreLike: Send + Sync + Sized + Unpin + 'static {
438
    /// Returns the immediate inner store driver.
439
    fn as_store_driver(&self) -> &'_ dyn StoreDriver;
440
441
    /// Utility function to return a pinned reference to self.
442
    fn as_pin(&self) -> Pin<&Self>;
443
444
    /// Utility function to return a pinned reference to the store driver.
445
    #[inline]
446
18.5k
    fn as_store_driver_pin(&self) -> Pin<&'_ dyn StoreDriver> {
447
18.5k
        Pin::new(self.as_store_driver())
448
18.5k
    }
449
450
    /// Look up a digest in the store and return None if it does not exist in
451
    /// the store, or Some(size) if it does.
452
    /// Note: On an AC store the size will be incorrect and should not be used!
453
    #[inline]
454
351
    fn has<'a>(
455
351
        &'a self,
456
351
        digest: impl Into<StoreKey<'a>>,
457
351
    ) -> impl Future<Output = Result<Option<u64>, Error>> + 'a {
458
351
        self.as_store_driver_pin().has(digest.into())
459
351
    }
460
461
    /// Look up a list of digests in the store and return a result for each in
462
    /// the same order as input.  The result will either be None if it does not
463
    /// exist in the store, or Some(size) if it does.
464
    /// Note: On an AC store the size will be incorrect and should not be used!
465
    #[inline]
466
23
    fn has_many<'a>(
467
23
        &'a self,
468
23
        digests: &'a [StoreKey<'a>],
469
23
    ) -> impl Future<Output = Result<Vec<Option<u64>>, Error>> + Send + 'a {
470
23
        if digests.is_empty() {
471
3
            return future::ready(Ok(vec![])).boxed();
472
20
        }
473
20
        self.as_store_driver_pin().has_many(digests)
474
23
    }
475
476
    /// The implementation of the above has and `has_many` functions.  See their
477
    /// documentation for details.
478
    #[inline]
479
86
    fn has_with_results<'a>(
480
86
        &'a self,
481
86
        digests: &'a [StoreKey<'a>],
482
86
        results: &'a mut [Option<u64>],
483
86
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
484
86
        if digests.is_empty() {
485
3
            return future::ready(Ok(())).boxed();
486
83
        }
487
83
        self.as_store_driver_pin()
488
83
            .has_with_results(digests, results)
489
86
    }
490
491
    /// List all the keys in the store that are within the given range.
492
    /// `handler` is called for each key in the range. If `handler` returns
493
    /// false, the listing is stopped.
494
    ///
495
    /// The number of keys passed through the handler is the return value.
496
    #[inline]
497
15
    fn list<'a, 'b>(
498
15
        &'a self,
499
15
        range: impl RangeBounds<StoreKey<'b>> + Send + 'b,
500
15
        mut handler: impl for<'c> FnMut(&'c StoreKey) -> bool + Send + Sync + 'a,
501
15
    ) -> impl Future<Output = Result<u64, Error>> + Send + 'a
502
15
    where
503
15
        'b: 'a,
504
    {
505
        // Note: We use a manual async move, so the future can own the `range` and `handler`,
506
        // otherwise we'd require the caller to pass them in by reference making more borrow
507
        // checker noise.
508
15
        async move {
509
15
            self.as_store_driver_pin()
510
15
                .list(
511
15
                    (
512
15
                        range.start_bound().map(StoreKey::borrow),
513
15
                        range.end_bound().map(StoreKey::borrow),
514
15
                    ),
515
15
                    &mut handler,
516
15
                )
517
15
                .await
518
15
        }
519
15
    }
520
521
    /// Sends the data to the store.
522
    #[inline]
523
5.32k
    fn update<'a>(
524
5.32k
        &'a self,
525
5.32k
        digest: impl Into<StoreKey<'a>>,
526
5.32k
        reader: DropCloserReadHalf,
527
5.32k
        upload_size: UploadSizeInfo,
528
5.32k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
529
5.32k
        self.as_store_driver_pin()
530
5.32k
            .update(digest.into(), reader, upload_size)
531
5.32k
    }
532
533
    /// Any optimizations the store might want to expose to the callers.
534
    /// By default, no optimizations are exposed.
535
    #[inline]
536
25
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
537
25
        self.as_store_driver_pin().optimized_for(optimization)
538
25
    }
539
540
    /// Specialized version of `.update()` which takes a `FileSlot`.
541
    /// This is useful if the underlying store can optimize the upload process
542
    /// when it knows the data is coming from a file.
543
    #[inline]
544
14
    fn update_with_whole_file<'a>(
545
14
        &'a self,
546
14
        digest: impl Into<StoreKey<'a>>,
547
14
        path: OsString,
548
14
        file: fs::FileSlot,
549
14
        upload_size: UploadSizeInfo,
550
14
    ) -> impl Future<Output = Result<Option<fs::FileSlot>, Error>> + Send + 'a {
551
14
        self.as_store_driver_pin()
552
14
            .update_with_whole_file(digest.into(), path, file, upload_size)
553
14
    }
554
555
    /// Utility to send all the data to the store when you have all the bytes.
556
    #[inline]
557
5.72k
    fn update_oneshot<'a>(
558
5.72k
        &'a self,
559
5.72k
        digest: impl Into<StoreKey<'a>>,
560
5.72k
        data: Bytes,
561
5.72k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
562
5.72k
        self.as_store_driver_pin()
563
5.72k
            .update_oneshot(digest.into(), data)
564
5.72k
    }
565
566
    /// Retrieves part of the data from the store and writes it to the given writer.
567
    #[inline]
568
1.38k
    fn get_part<'a>(
569
1.38k
        &'a self,
570
1.38k
        digest: impl Into<StoreKey<'a>>,
571
1.38k
        mut writer: impl BorrowMut<DropCloserWriteHalf> + Send + 'a,
572
1.38k
        offset: u64,
573
1.38k
        length: Option<u64>,
574
1.38k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
575
1.38k
        let key = digest.into();
576
        // Note: We need to capture `writer` just in case the caller
577
        // expects the drop() method to be called on it when the future
578
        // is done due to the complex interaction between the DropCloserWriteHalf
579
        // and the DropCloserReadHalf during drop().
580
1.38k
        async move {
581
1.38k
            self.as_store_driver_pin()
582
1.38k
                .get_part(key, writer.borrow_mut(), offset, length)
583
1.38k
                .await
584
1.38k
        }
585
1.38k
    }
586
587
    /// Utility that works the same as `.get_part()`, but writes all the data.
588
    #[inline]
589
23
    fn get<'a>(
590
23
        &'a self,
591
23
        key: impl Into<StoreKey<'a>>,
592
23
        writer: DropCloserWriteHalf,
593
23
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
594
23
        self.as_store_driver_pin().get(key.into(), writer)
595
23
    }
596
597
    /// Utility that will return all the bytes at once instead of in a streaming manner.
598
    #[inline]
599
5.31k
    fn get_part_unchunked<'a>(
600
5.31k
        &'a self,
601
5.31k
        key: impl Into<StoreKey<'a>>,
602
5.31k
        offset: u64,
603
5.31k
        length: Option<u64>,
604
5.31k
    ) -> impl Future<Output = Result<Bytes, Error>> + Send + 'a {
605
5.31k
        self.as_store_driver_pin()
606
5.31k
            .get_part_unchunked(key.into(), offset, length)
607
5.31k
    }
608
609
    /// Default implementation of the health check. Some stores may want to override this
610
    /// in situations where the default implementation is not sufficient.
611
    #[inline]
612
1
    fn check_health(
613
1
        &self,
614
1
        namespace: Cow<'static, str>,
615
1
    ) -> impl Future<Output = HealthStatus> + Send {
616
1
        self.as_store_driver_pin().check_health(namespace)
617
1
    }
618
}
619
620
#[async_trait]
621
pub trait StoreDriver:
622
    Sync + Send + Unpin + MetricsComponent + HealthStatusIndicator + 'static
623
{
624
    /// See: [`StoreLike::has`] for details.
625
    #[inline]
626
382
    async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<u64>, Error> {
627
        let mut result = [None];
628
        self.has_with_results(&[key], &mut result).await?;
629
        Ok(result[0])
630
382
    }
631
632
    /// See: [`StoreLike::has_many`] for details.
633
    #[inline]
634
    async fn has_many(
635
        self: Pin<&Self>,
636
        digests: &[StoreKey<'_>],
637
20
    ) -> Result<Vec<Option<u64>>, Error> {
638
        let mut results = vec![None; digests.len()];
639
        self.has_with_results(digests, &mut results).await?;
640
        Ok(results)
641
20
    }
642
643
    /// See: [`StoreLike::has_with_results`] for details.
644
    async fn has_with_results(
645
        self: Pin<&Self>,
646
        digests: &[StoreKey<'_>],
647
        results: &mut [Option<u64>],
648
    ) -> Result<(), Error>;
649
650
    /// See: [`StoreLike::list`] for details.
651
    async fn list(
652
        self: Pin<&Self>,
653
        _range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
654
        _handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
655
0
    ) -> Result<u64, Error> {
656
        // TODO(palfrey) We should force all stores to implement this function instead of
657
        // providing a default implementation.
658
        Err(make_err!(
659
            Code::Unimplemented,
660
            "Store::list() not implemented for this store"
661
        ))
662
0
    }
663
664
    /// See: [`StoreLike::update`] for details.
665
    async fn update(
666
        self: Pin<&Self>,
667
        key: StoreKey<'_>,
668
        reader: DropCloserReadHalf,
669
        upload_size: UploadSizeInfo,
670
    ) -> Result<(), Error>;
671
672
    /// See: [`StoreLike::optimized_for`] for details.
673
13
    fn optimized_for(&self, _optimization: StoreOptimizations) -> bool {
674
13
        false
675
13
    }
676
677
    /// See: [`StoreLike::update_with_whole_file`] for details.
678
    async fn update_with_whole_file(
679
        self: Pin<&Self>,
680
        key: StoreKey<'_>,
681
        path: OsString,
682
        mut file: fs::FileSlot,
683
        upload_size: UploadSizeInfo,
684
1
    ) -> Result<Option<fs::FileSlot>, Error> {
685
        let inner_store = self.inner_store(Some(key.borrow()));
686
        if inner_store.optimized_for(StoreOptimizations::FileUpdates) {
687
            error_if!(
688
                addr_eq(inner_store, &raw const *self),
689
                "Store::inner_store() returned self when optimization present"
690
            );
691
            return Pin::new(inner_store)
692
                .update_with_whole_file(key, path, file, upload_size)
693
                .await;
694
        }
695
        slow_update_store_with_file(self, key, &mut file, upload_size).await?;
696
        Ok(Some(file))
697
1
    }
698
699
    /// See: [`StoreLike::update_oneshot`] for details.
700
5.19k
    async fn update_oneshot(self: Pin<&Self>, key: StoreKey<'_>, data: Bytes) -> Result<(), Error> {
701
        // TODO(palfrey) This is extremely inefficient, since we have exactly
702
        // what we need here. Maybe we could instead make a version of the stream
703
        // that can take objects already fully in memory instead?
704
        let (mut tx, rx) = make_buf_channel_pair();
705
706
        let data_len =
707
            u64::try_from(data.len()).err_tip(|| "Could not convert data.len() to u64")?;
708
5.19k
        let send_fut = async move {
709
            // Only send if we are not EOF.
710
5.19k
            if !data.is_empty() {
711
5.14k
                tx.send(data)
712
5.14k
                    .await
713
5.14k
                    .err_tip(|| "Failed to write data in update_oneshot")
?0
;
714
49
            }
715
5.19k
            tx.send_eof()
716
5.19k
                .err_tip(|| "Failed to write EOF in update_oneshot")
?0
;
717
5.19k
            Ok(())
718
5.19k
        };
719
        try_join!(
720
            send_fut,
721
            self.update(key, rx, UploadSizeInfo::ExactSize(data_len))
722
        )?;
723
        Ok(())
724
5.19k
    }
725
726
    /// See: [`StoreLike::get_part`] for details.
727
    async fn get_part(
728
        self: Pin<&Self>,
729
        key: StoreKey<'_>,
730
        writer: &mut DropCloserWriteHalf,
731
        offset: u64,
732
        length: Option<u64>,
733
    ) -> Result<(), Error>;
734
735
    /// See: [`StoreLike::get`] for details.
736
    #[inline]
737
    async fn get(
738
        self: Pin<&Self>,
739
        key: StoreKey<'_>,
740
        mut writer: DropCloserWriteHalf,
741
23
    ) -> Result<(), Error> {
742
        self.get_part(key, &mut writer, 0, None).await
743
23
    }
744
745
    /// See: [`StoreLike::get_part_unchunked`] for details.
746
    async fn get_part_unchunked(
747
        self: Pin<&Self>,
748
        key: StoreKey<'_>,
749
        offset: u64,
750
        length: Option<u64>,
751
5.46k
    ) -> Result<Bytes, Error> {
752
        let length_usize = length
753
2.30k
            .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize"))
754
            .transpose()?;
755
756
        // TODO(palfrey) This is extremely inefficient, since we have exactly
757
        // what we need here. Maybe we could instead make a version of the stream
758
        // that can take objects already fully in memory instead?
759
        let (mut tx, mut rx) = make_buf_channel_pair();
760
761
        let (data_res, get_part_res) = join!(
762
            rx.consume(length_usize),
763
            // We use a closure here to ensure that the `tx` is dropped when the
764
            // future is done.
765
5.46k
            async move { self.get_part(key, &mut tx, offset, length).await 
}5.46k
,
766
        );
767
        get_part_res
768
            .err_tip(|| "Failed to get_part in get_part_unchunked")
769
            .merge(data_res.err_tip(|| "Failed to read stream to completion in get_part_unchunked"))
770
5.46k
    }
771
772
    /// See: [`StoreLike::check_health`] for details.
773
1
    async fn check_health(self: Pin<&Self>, namespace: Cow<'static, str>) -> HealthStatus {
774
        let digest_data_size = default_digest_size_health_check();
775
        let mut digest_data = vec![0u8; digest_data_size];
776
777
        let mut namespace_hasher = StdHasher::new();
778
        namespace.hash(&mut namespace_hasher);
779
        self.get_name().hash(&mut namespace_hasher);
780
        let hash_seed = namespace_hasher.finish();
781
782
        // Fill the digest data with random data based on a stable
783
        // hash of the namespace and store name. Intention is to
784
        // have randomly filled data that is unique per store and
785
        // does not change between health checks. This is to ensure
786
        // we are not adding more data to store on each health check.
787
        let mut rng: StdRng = StdRng::seed_from_u64(hash_seed);
788
        rng.fill_bytes(&mut digest_data);
789
790
        let mut digest_hasher = default_digest_hasher_func().hasher();
791
        digest_hasher.update(&digest_data);
792
        let digest_data_len = digest_data.len() as u64;
793
        let digest_info = StoreKey::from(digest_hasher.finalize_digest());
794
795
        let digest_bytes = Bytes::copy_from_slice(&digest_data);
796
797
        if let Err(e) = self
798
            .update_oneshot(digest_info.borrow(), digest_bytes.clone())
799
            .await
800
        {
801
            warn!(?e, "check_health Store.update_oneshot() failed");
802
            return HealthStatus::new_failed(
803
                self.get_ref(),
804
                format!("Store.update_oneshot() failed: {e}").into(),
805
            );
806
        }
807
808
        match self.has(digest_info.borrow()).await {
809
            Ok(Some(s)) => {
810
                if s != digest_data_len {
811
                    return HealthStatus::new_failed(
812
                        self.get_ref(),
813
                        format!("Store.has() size mismatch {s} != {digest_data_len}").into(),
814
                    );
815
                }
816
            }
817
            Ok(None) => {
818
                return HealthStatus::new_failed(
819
                    self.get_ref(),
820
                    "Store.has() size not found".into(),
821
                );
822
            }
823
            Err(e) => {
824
                return HealthStatus::new_failed(
825
                    self.get_ref(),
826
                    format!("Store.has() failed: {e}").into(),
827
                );
828
            }
829
        }
830
831
        match self
832
            .get_part_unchunked(digest_info, 0, Some(digest_data_len))
833
            .await
834
        {
835
            Ok(b) => {
836
                if b != digest_bytes {
837
                    return HealthStatus::new_failed(
838
                        self.get_ref(),
839
                        "Store.get_part_unchunked() data mismatch".into(),
840
                    );
841
                }
842
            }
843
            Err(e) => {
844
                return HealthStatus::new_failed(
845
                    self.get_ref(),
846
                    format!("Store.get_part_unchunked() failed: {e}").into(),
847
                );
848
            }
849
        }
850
851
        HealthStatus::new_ok(self.get_ref(), "Successfully store health check".into())
852
1
    }
853
854
    /// See: [`Store::inner_store`] for details.
855
    fn inner_store(&self, _digest: Option<StoreKey<'_>>) -> &dyn StoreDriver;
856
857
    /// Returns an Any variation of whatever Self is.
858
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static);
859
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static>;
860
861
    // Register health checks used to monitor the store.
862
0
    fn register_health(self: Arc<Self>, _registry: &mut HealthRegistryBuilder) {}
863
864
    fn register_remove_callback(
865
        self: Arc<Self>,
866
        callback: Arc<dyn RemoveItemCallback>,
867
    ) -> Result<(), Error>;
868
}
869
870
// Callback to be called when a store deletes an item. This is used so
871
// compound stores can remove items from their internal state when their
872
// underlying stores remove items e.g. caches
873
pub trait RemoveItemCallback: Debug + Send + Sync {
874
    fn callback<'a>(
875
        &'a self,
876
        store_key: StoreKey<'a>,
877
    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
878
}
879
880
/// The instructions on how to decode a value from a Bytes & version into
881
/// the underlying type.
882
pub trait SchedulerStoreDecodeTo {
883
    type DecodeOutput;
884
    fn decode(version: i64, data: Bytes) -> Result<Self::DecodeOutput, Error>;
885
}
886
887
pub trait SchedulerSubscription: Send + Sync {
888
    fn changed(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
889
}
890
891
pub trait SchedulerSubscriptionManager: Send + Sync {
892
    type Subscription: SchedulerSubscription;
893
894
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
895
    where
896
        K: SchedulerStoreKeyProvider;
897
898
    fn is_reliable() -> bool;
899
}
900
901
/// The API surface for a scheduler store.
902
pub trait SchedulerStore: Send + Sync + 'static {
903
    type SubscriptionManager: SchedulerSubscriptionManager;
904
905
    /// Returns the subscription manager for the scheduler store.
906
    fn subscription_manager(
907
        &self,
908
    ) -> impl Future<Output = Result<Arc<Self::SubscriptionManager>, Error>> + Send;
909
910
    /// Updates or inserts an entry into the underlying store.
911
    /// Metadata about the key is attached to the compile-time type.
912
    /// If `StoreKeyProvider::Versioned` is `TrueValue`, the data will not
913
    /// be updated if the current version in the database does not match
914
    /// the version in the passed in data.
915
    /// No guarantees are made about when `Version` is `FalseValue`.
916
    /// Indexes are guaranteed to be updated atomically with the data.
917
    fn update_data<T>(
918
        &self,
919
        data: T,
920
        expiry: Option<Duration>,
921
    ) -> impl Future<Output = Result<Option<i64>, Error>> + Send
922
    where
923
        T: SchedulerStoreDataProvider
924
            + SchedulerStoreKeyProvider
925
            + SchedulerCurrentVersionProvider
926
            + Send;
927
928
    /// Searches for all keys in the store that match the given index prefix.
929
    fn search_by_index_prefix<K>(
930
        &self,
931
        index: K,
932
    ) -> impl Future<
933
        Output = Result<
934
            impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
935
            Error,
936
        >,
937
    > + Send
938
    where
939
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,
940
        <K as SchedulerStoreDecodeTo>::DecodeOutput: Send;
941
942
    /// Returns data for the provided key with the given version if
943
    /// `StoreKeyProvider::Versioned` is `TrueValue`.
944
    fn get_and_decode<K>(
945
        &self,
946
        key: K,
947
    ) -> impl Future<Output = Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>> + Send
948
    where
949
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send;
950
}
951
952
/// A type that is used to let the scheduler store know what
953
/// index is being requested.
954
pub trait SchedulerIndexProvider {
955
    /// Only keys inserted with this prefix will be indexed.
956
    const KEY_PREFIX: &'static str;
957
958
    /// The name of the index.
959
    const INDEX_NAME: &'static str;
960
961
    /// The sort key for the index (if any).
962
    const MAYBE_SORT_KEY: Option<&'static str> = None;
963
964
    /// If the data is versioned.
965
    type Versioned: BoolValue;
966
967
    /// The value of the index.
968
    fn index_value(&self) -> Cow<'_, str>;
969
}
970
971
/// Provides a key to lookup data in the store.
972
pub trait SchedulerStoreKeyProvider {
973
    /// If the data is versioned.
974
    type Versioned: BoolValue;
975
976
    /// Returns the key for the data.
977
    fn get_key(&self) -> StoreKey<'static>;
978
}
979
980
/// Provides data to be stored in the scheduler store.
981
pub trait SchedulerStoreDataProvider {
982
    /// Converts the data into bytes to be stored in the store.
983
    fn try_into_bytes(self) -> Result<Bytes, Error>;
984
985
    /// Returns the indexes for the data if any.
986
3
    fn get_indexes(&self) -> Result<Vec<(&'static str, Bytes)>, Error> {
987
3
        Ok(Vec::new())
988
3
    }
989
}
990
991
/// Provides the current version of the data in the store.
992
pub trait SchedulerCurrentVersionProvider {
993
    /// Returns the current version of the data in the store.
994
    fn current_version(&self) -> i64;
995
}
996
997
/// Default implementation for when we are not providing a version
998
/// for the data.
999
impl<T> SchedulerCurrentVersionProvider for T
1000
where
1001
    T: SchedulerStoreKeyProvider<Versioned = FalseValue>,
1002
{
1003
0
    fn current_version(&self) -> i64 {
1004
0
        0
1005
0
    }
1006
}
1007
1008
/// Compile time types for booleans.
1009
pub trait BoolValue {
1010
    const VALUE: bool;
1011
}
1012
/// Compile time check if something is false.
1013
pub trait IsFalse {}
1014
/// Compile time check if something is true.
1015
pub trait IsTrue {}
1016
1017
/// Compile time true value.
1018
#[derive(Debug, Clone, Copy)]
1019
pub struct TrueValue;
1020
impl BoolValue for TrueValue {
1021
    const VALUE: bool = true;
1022
}
1023
impl IsTrue for TrueValue {}
1024
1025
/// Compile time false value.
1026
#[derive(Debug, Clone, Copy)]
1027
pub struct FalseValue;
1028
impl BoolValue for FalseValue {
1029
    const VALUE: bool = false;
1030
}
1031
impl IsFalse for FalseValue {}