Coverage Report

Created: 2026-06-29 16:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/store_trait.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::borrow::{Borrow, BorrowMut};
16
use core::convert::Into;
17
use core::fmt::{self, Debug, Display};
18
use core::future;
19
use core::hash::{Hash, Hasher};
20
use core::ops::{Bound, RangeBounds};
21
use core::pin::Pin;
22
use core::ptr::addr_eq;
23
use core::time::Duration;
24
use std::borrow::Cow;
25
use std::collections::hash_map::DefaultHasher as StdHasher;
26
use std::ffi::OsString;
27
use std::sync::{Arc, OnceLock};
28
29
use async_trait::async_trait;
30
use bytes::{Bytes, BytesMut};
31
use futures::{Future, FutureExt, Stream, join, try_join};
32
use nativelink_error::{Code, Error, ResultExt, error_if, make_err};
33
use nativelink_metric::MetricsComponent;
34
use rand::rngs::StdRng;
35
use rand::{RngCore, SeedableRng};
36
use serde::{Deserialize, Serialize};
37
use tokio::io::{AsyncReadExt, AsyncSeekExt};
38
use tracing::warn;
39
40
use crate::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair};
41
use crate::common::DigestInfo;
42
use crate::digest_hasher::{DigestHasher, DigestHasherFunc, default_digest_hasher_func};
43
use crate::fs;
44
use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
45
46
static DEFAULT_DIGEST_SIZE_HEALTH_CHECK: OnceLock<usize> = OnceLock::new();
47
/// Default digest size for health check data. Any change in this value
48
/// changes the default contract. `GlobalConfig` should be updated to reflect
49
/// changes in this value.
50
pub const DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG: usize = 1024 * 1024;
51
52
// Get the default digest size for health check data, if value is unset a system wide default is used.
53
1
pub fn default_digest_size_health_check() -> usize {
54
1
    *DEFAULT_DIGEST_SIZE_HEALTH_CHECK.get_or_init(|| DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG)
55
1
}
56
57
/// Set the default digest size for health check data, this should be called once.
58
0
pub fn set_default_digest_size_health_check(size: usize) -> Result<(), Error> {
59
0
    DEFAULT_DIGEST_SIZE_HEALTH_CHECK.set(size).map_err(|_| {
60
0
        make_err!(
61
0
            Code::Internal,
62
            "set_default_digest_size_health_check already set"
63
        )
64
0
    })
65
0
}
66
67
#[derive(
68
    Debug,
69
    PartialEq,
70
    Eq,
71
    Copy,
72
    Clone,
73
    Serialize,
74
    Deserialize,
75
0
    wincode::SchemaWrite,
76
0
    wincode::SchemaRead,
77
)]
78
pub enum UploadSizeInfo {
79
    /// When the data transfer amount is known to be exact size, this enum should be used.
80
    /// The receiver store can use this to better optimize the way the data is sent or stored.
81
    ExactSize(u64),
82
83
    /// When the data transfer amount is not known to be exact, the caller should use this enum
84
    /// to provide the maximum size that could possibly be sent. This will bypass the exact size
85
    /// checks, but still provide useful information to the underlying store about the data being
86
    /// sent that it can then use to optimize the upload process.
87
    MaxSize(u64),
88
}
89
90
/// Utility to send all the data to the store from a file.
91
// Note: This is not inlined because some code may want to bypass any underlying
92
// optimizations that may be present in the inner store.
93
11
pub async fn slow_update_store_with_file<S: StoreDriver + ?Sized>(
94
11
    store: Pin<&S>,
95
11
    digest: impl Into<StoreKey<'_>>,
96
11
    file: &mut fs::FileSlot,
97
11
    upload_size: UploadSizeInfo,
98
11
) -> Result<u64, Error> {
99
11
    file.rewind()
100
11
        .await
101
11
        .err_tip(|| "Failed to rewind in upload_file_to_store")
?0
;
102
11
    let (mut tx, rx) = make_buf_channel_pair();
103
104
11
    let update_fut = store
105
11
        .update(digest.into(), rx, upload_size)
106
11
        .map(|r| r.err_tip(|| "Could not upload data to store in upload_file_to_store"));
107
11
    let read_data_fut = async move {
108
        loop {
109
85
            let mut buf = BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE);
110
85
            let read = file
111
85
                .read_buf(&mut buf)
112
85
                .await
113
85
                .err_tip(|| "Failed to read in upload_file_to_store")
?0
;
114
85
            if read == 0 {
115
11
                break;
116
74
            }
117
74
            tx.send(buf.freeze())
118
74
                .await
119
74
                .err_tip(|| "Failed to send in upload_file_to_store")
?0
;
120
        }
121
11
        tx.send_eof()
122
11
            .err_tip(|| "Could not send EOF to store in upload_file_to_store")
123
11
    };
124
11
    tokio::pin!(read_data_fut);
125
11
    let (update_res, read_res) = tokio::join!(update_fut, read_data_fut);
126
11
    read_res.merge(update_res)
127
11
}
128
129
/// Optimizations that stores may want to expose to the callers.
130
/// This is useful for specific cases when the store can optimize the processing
131
/// of the data being processed.
132
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
133
pub enum StoreOptimizations {
134
    /// The store can optimize the upload process when it knows the data is coming from a file.
135
    FileUpdates,
136
137
    /// If the store will ignore the data uploads.
138
    NoopUpdates,
139
140
    /// If the store will never serve downloads.
141
    NoopDownloads,
142
143
    /// If the store will determine whether a key has associated data once a read has been
144
    /// attempted instead of calling `.has()` first.
145
    LazyExistenceOnSync,
146
147
    /// The store provides an optimized `update_oneshot` implementation that bypasses
148
    /// channel overhead for direct Bytes writes. Stores with this optimization can
149
    /// accept complete data directly without going through the MPSC channel.
150
    SubscribesToUpdateOneshot,
151
}
152
153
/// A wrapper struct for [`StoreKey`] to work around
154
/// lifetime limitations in `HashMap::get()` as described in
155
/// <https://github.com/rust-lang/rust/issues/80389>
156
///
157
/// As such this is a wrapper type that is stored in the
158
/// maps using the workaround as described in
159
/// <https://blinsay.com/blog/compound-keys/>
160
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
161
#[repr(transparent)]
162
pub struct StoreKeyBorrow(StoreKey<'static>);
163
164
impl From<StoreKey<'static>> for StoreKeyBorrow {
165
6.01k
    fn from(key: StoreKey<'static>) -> Self {
166
6.01k
        Self(key)
167
6.01k
    }
168
}
169
170
impl From<StoreKeyBorrow> for StoreKey<'static> {
171
0
    fn from(key_borrow: StoreKeyBorrow) -> Self {
172
0
        key_borrow.0
173
0
    }
174
}
175
176
impl<'a> Borrow<StoreKey<'a>> for StoreKeyBorrow {
177
5.06k
    fn borrow(&self) -> &StoreKey<'a> {
178
5.06k
        &self.0
179
5.06k
    }
180
}
181
182
impl<'a> Borrow<StoreKey<'a>> for &StoreKeyBorrow {
183
0
    fn borrow(&self) -> &StoreKey<'a> {
184
0
        &self.0
185
0
    }
186
}
187
188
/// Holds something that can be converted into a key the
189
/// store API can understand. Generally this is a digest
190
/// but it can also be a string if the caller wishes to
191
/// store the data directly and reference it by a string
192
/// directly.
193
#[derive(Debug, Eq)]
194
pub enum StoreKey<'a> {
195
    /// A string key.
196
    Str(Cow<'a, str>),
197
198
    /// A key that is a digest.
199
    Digest(DigestInfo),
200
}
201
202
impl<'a> StoreKey<'a> {
203
    /// Creates a new store key from a string.
204
26
    pub const fn new_str(s: &'a str) -> Self {
205
26
        StoreKey::Str(Cow::Borrowed(s))
206
26
    }
207
208
    /// Returns a shallow clone of the key.
209
    /// This is extremely cheap and should be used when clone
210
    /// is needed but the key is not going to be modified.
211
    #[must_use]
212
    #[allow(
213
        clippy::missing_const_for_fn,
214
        reason = "False positive on stable, but not on nightly"
215
    )]
216
5.18k
    pub fn borrow(&'a self) -> Self {
217
67
        match self {
218
26
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Borrowed(s)),
219
41
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Borrowed(s)),
220
5.11k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
221
        }
222
5.18k
    }
223
224
    /// Converts the key into an owned version. This is useful
225
    /// when the caller needs an owned version of the key.
226
13.1k
    pub fn into_owned(self) -> StoreKey<'static> {
227
71
        match self {
228
7
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Owned(s)),
229
64
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Owned(s.to_owned())),
230
13.0k
            StoreKey::Digest(d) => StoreKey::Digest(d),
231
        }
232
13.1k
    }
233
234
    /// Converts the key into a digest. This is useful when the caller
235
    /// must have a digest key. If the data is not a digest, it may
236
    /// hash the underlying key and return a digest of the hash of the key
237
186
    pub fn into_digest(self) -> DigestInfo {
238
186
        match self {
239
184
            StoreKey::Digest(digest) => digest,
240
2
            StoreKey::Str(s) => {
241
2
                let mut hasher = DigestHasherFunc::Blake3.hasher();
242
2
                hasher.update(s.as_bytes());
243
2
                hasher.finalize_digest()
244
            }
245
        }
246
186
    }
247
248
    /// Returns the key as a string. If the key is a digest, it will
249
    /// return a string representation of the digest. If the key is a string,
250
    /// it will return the string itself.
251
684
    pub fn as_str(&'a self) -> Cow<'a, str> {
252
542
        match self {
253
541
            StoreKey::Str(Cow::Owned(s)) => Cow::Borrowed(s),
254
1
            StoreKey::Str(Cow::Borrowed(s)) => Cow::Borrowed(s),
255
142
            StoreKey::Digest(d) => Cow::Owned(format!("{d}")),
256
        }
257
684
    }
258
}
259
260
impl Clone for StoreKey<'static> {
261
10.5k
    fn clone(&self) -> Self {
262
10.5k
        match self {
263
27
            StoreKey::Str(s) => StoreKey::Str(s.clone()),
264
10.4k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
265
        }
266
10.5k
    }
267
}
268
269
impl PartialOrd for StoreKey<'_> {
270
40
    fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
271
40
        Some(self.cmp(other))
272
40
    }
273
}
274
275
impl Ord for StoreKey<'_> {
276
57
    fn cmp(&self, other: &Self) -> core::cmp::Ordering {
277
57
        match (self, other) {
278
57
            (StoreKey::Str(a), StoreKey::Str(b)) => a.cmp(b),
279
0
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a.cmp(b),
280
0
            (StoreKey::Str(_), StoreKey::Digest(_)) => core::cmp::Ordering::Less,
281
0
            (StoreKey::Digest(_), StoreKey::Str(_)) => core::cmp::Ordering::Greater,
282
        }
283
57
    }
284
}
285
286
impl PartialEq for StoreKey<'_> {
287
6.20k
    fn eq(&self, other: &Self) -> bool {
288
6.20k
        match (self, other) {
289
51
            (StoreKey::Str(a), StoreKey::Str(b)) => a == b,
290
6.15k
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a == b,
291
0
            _ => false,
292
        }
293
6.20k
    }
294
}
295
296
impl Hash for StoreKey<'_> {
297
26.4k
    fn hash<H: Hasher>(&self, state: &mut H) {
298
        /// Salts the hash with the enum value that represents
299
        /// the type of the key.
300
        #[repr(u8)]
301
        enum HashId {
302
            Str = 0,
303
            Digest = 1,
304
        }
305
26.4k
        match self {
306
44
            StoreKey::Str(s) => {
307
44
                (HashId::Str as u8).hash(state);
308
44
                s.hash(state);
309
44
            }
310
26.3k
            StoreKey::Digest(d) => {
311
26.3k
                (HashId::Digest as u8).hash(state);
312
26.3k
                d.hash(state);
313
26.3k
            }
314
        }
315
26.4k
    }
316
}
317
318
impl<'a> From<&'a str> for StoreKey<'a> {
319
0
    fn from(s: &'a str) -> Self {
320
0
        StoreKey::Str(Cow::Borrowed(s))
321
0
    }
322
}
323
324
impl From<String> for StoreKey<'static> {
325
0
    fn from(s: String) -> Self {
326
0
        StoreKey::Str(Cow::Owned(s))
327
0
    }
328
}
329
330
impl From<DigestInfo> for StoreKey<'_> {
331
10.8k
    fn from(d: DigestInfo) -> Self {
332
10.8k
        StoreKey::Digest(d)
333
10.8k
    }
334
}
335
336
impl From<&DigestInfo> for StoreKey<'_> {
337
107
    fn from(d: &DigestInfo) -> Self {
338
107
        StoreKey::Digest(*d)
339
107
    }
340
}
341
342
// mostly for use with tracing::Value
343
impl Display for StoreKey<'_> {
344
56
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
345
56
        match self {
346
32
            StoreKey::Str(s) => {
347
32
                write!(f, "{s}")
348
            }
349
24
            StoreKey::Digest(d) => {
350
24
                write!(f, "Digest: {d}")
351
            }
352
        }
353
56
    }
354
}
355
356
#[derive(Clone, MetricsComponent)]
357
#[repr(transparent)]
358
pub struct Store {
359
    #[metric]
360
    inner: Arc<dyn StoreDriver>,
361
}
362
363
impl Debug for Store {
364
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
365
0
        f.debug_struct("Store").finish_non_exhaustive()
366
0
    }
367
}
368
369
impl Store {
370
440
    pub fn new(inner: Arc<dyn StoreDriver>) -> Self {
371
440
        Self { inner }
372
440
    }
373
374
    /// Returns the immediate inner store driver.
375
    /// Note: This does not recursively try to resolve underlying store drivers
376
    /// like `.inner_store()` does.
377
    #[inline]
378
10
    pub fn into_inner(self) -> Arc<dyn StoreDriver> {
379
10
        self.inner
380
10
    }
381
382
    /// Gets the underlying store for the given digest.
383
    /// A caller might want to use this to obtain a reference to the "real" underlying store
384
    /// (if applicable) and check if it implements some special traits that allow optimizations.
385
    /// Note: If the store performs complex operations on the data, it should return itself.
386
    #[inline]
387
434
    pub fn inner_store<'a, K: Into<StoreKey<'a>>>(&self, digest: Option<K>) -> &dyn StoreDriver {
388
434
        self.inner.inner_store(digest.map(Into::into))
389
434
    }
390
391
    /// Tries to cast the underlying store to the given type.
392
    #[inline]
393
96
    pub fn downcast_ref<U: StoreDriver>(&self, maybe_digest: Option<StoreKey<'_>>) -> Option<&U> {
394
96
        self.inner.inner_store(maybe_digest).as_any().downcast_ref()
395
96
    }
396
397
    /// Register health checks used to monitor the store.
398
    #[inline]
399
0
    pub fn register_health(&self, registry: &mut HealthRegistryBuilder) {
400
0
        self.inner.clone().register_health(registry);
401
0
    }
402
403
    #[inline]
404
9
    pub fn register_remove_callback(
405
9
        &self,
406
9
        callback: Arc<dyn RemoveItemCallback>,
407
9
    ) -> Result<(), Error> {
408
9
        self.inner.clone().register_remove_callback(callback)
409
9
    }
410
}
411
412
impl StoreLike for Store {
413
    #[inline]
414
10.6k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
415
10.6k
        self.inner.as_ref()
416
10.6k
    }
417
418
12
    fn as_pin(&self) -> Pin<&Self> {
419
12
        Pin::new(self)
420
12
    }
421
}
422
423
impl<T> StoreLike for T
424
where
425
    T: StoreDriver + Sized,
426
{
427
    #[inline]
428
8.24k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
429
8.24k
        self
430
8.24k
    }
431
432
117
    fn as_pin(&self) -> Pin<&Self> {
433
117
        Pin::new(self)
434
117
    }
435
}
436
437
pub trait StoreLike: Send + Sync + Sized + Unpin + 'static {
438
    /// Returns the immediate inner store driver.
439
    fn as_store_driver(&self) -> &'_ dyn StoreDriver;
440
441
    /// Utility function to return a pinned reference to self.
442
    fn as_pin(&self) -> Pin<&Self>;
443
444
    /// Utility function to return a pinned reference to the store driver.
445
    #[inline]
446
18.8k
    fn as_store_driver_pin(&self) -> Pin<&'_ dyn StoreDriver> {
447
18.8k
        Pin::new(self.as_store_driver())
448
18.8k
    }
449
450
    /// Look up a digest in the store and return None if it does not exist in
451
    /// the store, or Some(size) if it does.
452
    /// Note: On an AC store the size will be incorrect and should not be used!
453
    #[inline]
454
467
    fn has<'a>(
455
467
        &'a self,
456
467
        digest: impl Into<StoreKey<'a>>,
457
467
    ) -> impl Future<Output = Result<Option<u64>, Error>> + 'a {
458
467
        self.as_store_driver_pin().has(digest.into())
459
467
    }
460
461
    /// Look up a list of digests in the store and return a result for each in
462
    /// the same order as input.  The result will either be None if it does not
463
    /// exist in the store, or Some(size) if it does.
464
    /// Note: On an AC store the size will be incorrect and should not be used!
465
    #[inline]
466
23
    fn has_many<'a>(
467
23
        &'a self,
468
23
        digests: &'a [StoreKey<'a>],
469
23
    ) -> impl Future<Output = Result<Vec<Option<u64>>, Error>> + Send + 'a {
470
23
        if digests.is_empty() {
471
3
            return future::ready(Ok(vec![])).boxed();
472
20
        }
473
20
        self.as_store_driver_pin().has_many(digests)
474
23
    }
475
476
    /// The implementation of the above has and `has_many` functions.  See their
477
    /// documentation for details.
478
    #[inline]
479
72
    fn has_with_results<'a>(
480
72
        &'a self,
481
72
        digests: &'a [StoreKey<'a>],
482
72
        results: &'a mut [Option<u64>],
483
72
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
484
72
        if digests.is_empty() {
485
3
            return future::ready(Ok(())).boxed();
486
69
        }
487
69
        self.as_store_driver_pin()
488
69
            .has_with_results(digests, results)
489
72
    }
490
491
    /// List all the keys in the store that are within the given range.
492
    /// `handler` is called for each key in the range. If `handler` returns
493
    /// false, the listing is stopped.
494
    ///
495
    /// The number of keys passed through the handler is the return value.
496
    #[inline]
497
15
    fn list<'a, 'b>(
498
15
        &'a self,
499
15
        range: impl RangeBounds<StoreKey<'b>> + Send + 'b,
500
15
        mut handler: impl for<'c> FnMut(&'c StoreKey) -> bool + Send + Sync + 'a,
501
15
    ) -> impl Future<Output = Result<u64, Error>> + Send + 'a
502
15
    where
503
15
        'b: 'a,
504
    {
505
        // Note: We use a manual async move, so the future can own the `range` and `handler`,
506
        // otherwise we'd require the caller to pass them in by reference making more borrow
507
        // checker noise.
508
15
        async move {
509
15
            self.as_store_driver_pin()
510
15
                .list(
511
15
                    (
512
15
                        range.start_bound().map(StoreKey::borrow),
513
15
                        range.end_bound().map(StoreKey::borrow),
514
15
                    ),
515
15
                    &mut handler,
516
15
                )
517
15
                .await
518
15
        }
519
15
    }
520
521
    /// Sends the data to the store.
522
    #[inline]
523
5.36k
    fn update<'a>(
524
5.36k
        &'a self,
525
5.36k
        digest: impl Into<StoreKey<'a>>,
526
5.36k
        reader: DropCloserReadHalf,
527
5.36k
        upload_size: UploadSizeInfo,
528
5.36k
    ) -> impl Future<Output = Result<u64, Error>> + Send + 'a {
529
5.36k
        self.as_store_driver_pin()
530
5.36k
            .update(digest.into(), reader, upload_size)
531
5.36k
    }
532
533
    /// Any optimizations the store might want to expose to the callers.
534
    /// By default, no optimizations are exposed.
535
    #[inline]
536
27
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
537
27
        self.as_store_driver_pin().optimized_for(optimization)
538
27
    }
539
540
    /// Specialized version of `.update()` which takes a `FileSlot`.
541
    /// This is useful if the underlying store can optimize the upload process
542
    /// when it knows the data is coming from a file.
543
    #[inline]
544
14
    fn update_with_whole_file<'a>(
545
14
        &'a self,
546
14
        digest: impl Into<StoreKey<'a>>,
547
14
        path: OsString,
548
14
        file: fs::FileSlot,
549
14
        upload_size: UploadSizeInfo,
550
14
    ) -> impl Future<Output = Result<(u64, Option<fs::FileSlot>), Error>> + Send + 'a {
551
14
        self.as_store_driver_pin()
552
14
            .update_with_whole_file(digest.into(), path, file, upload_size)
553
14
    }
554
555
    /// Utility to send all the data to the store when you have all the bytes.
556
    #[inline]
557
5.75k
    fn update_oneshot<'a>(
558
5.75k
        &'a self,
559
5.75k
        digest: impl Into<StoreKey<'a>>,
560
5.75k
        data: Bytes,
561
5.75k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
562
5.75k
        self.as_store_driver_pin()
563
5.75k
            .update_oneshot(digest.into(), data)
564
5.75k
    }
565
566
    /// Retrieves part of the data from the store and writes it to the given writer.
567
    #[inline]
568
1.41k
    fn get_part<'a>(
569
1.41k
        &'a self,
570
1.41k
        digest: impl Into<StoreKey<'a>>,
571
1.41k
        mut writer: impl BorrowMut<DropCloserWriteHalf> + Send + 'a,
572
1.41k
        offset: u64,
573
1.41k
        length: Option<u64>,
574
1.41k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
575
1.41k
        let key = digest.into();
576
        // Note: We need to capture `writer` just in case the caller
577
        // expects the drop() method to be called on it when the future
578
        // is done due to the complex interaction between the DropCloserWriteHalf
579
        // and the DropCloserReadHalf during drop().
580
1.41k
        async move {
581
1.41k
            self.as_store_driver_pin()
582
1.41k
                .get_part(key, writer.borrow_mut(), offset, length)
583
1.41k
                .await
584
1.41k
        }
585
1.41k
    }
586
587
    /// Utility that works the same as `.get_part()`, but writes all the data.
588
    #[inline]
589
55
    fn get<'a>(
590
55
        &'a self,
591
55
        key: impl Into<StoreKey<'a>>,
592
55
        writer: DropCloserWriteHalf,
593
55
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
594
55
        self.as_store_driver_pin().get(key.into(), writer)
595
55
    }
596
597
    /// Utility that will return all the bytes at once instead of in a streaming manner.
598
    #[inline]
599
5.34k
    fn get_part_unchunked<'a>(
600
5.34k
        &'a self,
601
5.34k
        key: impl Into<StoreKey<'a>>,
602
5.34k
        offset: u64,
603
5.34k
        length: Option<u64>,
604
5.34k
    ) -> impl Future<Output = Result<Bytes, Error>> + Send + 'a {
605
5.34k
        self.as_store_driver_pin()
606
5.34k
            .get_part_unchunked(key.into(), offset, length)
607
5.34k
    }
608
609
    /// Default implementation of the health check. Some stores may want to override this
610
    /// in situations where the default implementation is not sufficient.
611
    #[inline]
612
1
    fn check_health(
613
1
        &self,
614
1
        namespace: Cow<'static, str>,
615
1
    ) -> impl Future<Output = HealthStatus> + Send {
616
1
        self.as_store_driver_pin().check_health(namespace)
617
1
    }
618
}
619
620
#[async_trait]
621
pub trait StoreDriver:
622
    Sync + Send + Unpin + MetricsComponent + HealthStatusIndicator + 'static
623
{
624
    // Do "all the stores are setup" init e.g. if we need access to the store manager
625
    // for ref stores
626
    async fn post_init(self: Arc<Self>) -> Result<(), Error>;
627
628
    /// See: [`StoreLike::has`] for details.
629
    #[inline]
630
498
    async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<u64>, Error> {
631
        let mut result = [None];
632
        self.has_with_results(&[key], &mut result).await?;
633
        Ok(result[0])
634
498
    }
635
636
    /// See: [`StoreLike::has_many`] for details.
637
    #[inline]
638
    async fn has_many(
639
        self: Pin<&Self>,
640
        digests: &[StoreKey<'_>],
641
20
    ) -> Result<Vec<Option<u64>>, Error> {
642
        let mut results = vec![None; digests.len()];
643
        self.has_with_results(digests, &mut results).await?;
644
        Ok(results)
645
20
    }
646
647
    /// See: [`StoreLike::has_with_results`] for details.
648
    async fn has_with_results(
649
        self: Pin<&Self>,
650
        digests: &[StoreKey<'_>],
651
        results: &mut [Option<u64>],
652
    ) -> Result<(), Error>;
653
654
    /// See: [`StoreLike::list`] for details.
655
    async fn list(
656
        self: Pin<&Self>,
657
        _range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
658
        _handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
659
0
    ) -> Result<u64, Error> {
660
        // TODO(palfrey) We should force all stores to implement this function instead of
661
        // providing a default implementation.
662
        Err(make_err!(
663
            Code::Unimplemented,
664
            "Store::list() not implemented for this store"
665
        ))
666
0
    }
667
668
    /// See: [`StoreLike::update`] for details.
669
    async fn update(
670
        self: Pin<&Self>,
671
        key: StoreKey<'_>,
672
        reader: DropCloserReadHalf,
673
        upload_size: UploadSizeInfo,
674
    ) -> Result<u64, Error>;
675
676
    /// See: [`StoreLike::optimized_for`] for details.
677
15
    fn optimized_for(&self, _optimization: StoreOptimizations) -> bool {
678
15
        false
679
15
    }
680
681
    /// See: [`StoreLike::update_with_whole_file`] for details.
682
    async fn update_with_whole_file(
683
        self: Pin<&Self>,
684
        key: StoreKey<'_>,
685
        path: OsString,
686
        mut file: fs::FileSlot,
687
        upload_size: UploadSizeInfo,
688
1
    ) -> Result<(u64, Option<fs::FileSlot>), Error> {
689
        let inner_store = self.inner_store(Some(key.borrow()));
690
        if inner_store.optimized_for(StoreOptimizations::FileUpdates) {
691
            error_if!(
692
                addr_eq(inner_store, &raw const *self),
693
                "Store::inner_store() returned self when optimization present"
694
            );
695
            return Pin::new(inner_store)
696
                .update_with_whole_file(key, path, file, upload_size)
697
                .await;
698
        }
699
        let size = slow_update_store_with_file(self, key, &mut file, upload_size).await?;
700
        Ok((size, Some(file)))
701
1
    }
702
703
    /// See: [`StoreLike::update_oneshot`] for details.
704
5.20k
    async fn update_oneshot(self: Pin<&Self>, key: StoreKey<'_>, data: Bytes) -> Result<(), Error> {
705
        // TODO(palfrey) This is extremely inefficient, since we have exactly
706
        // what we need here. Maybe we could instead make a version of the stream
707
        // that can take objects already fully in memory instead?
708
        let (mut tx, rx) = make_buf_channel_pair();
709
710
        let data_len =
711
            u64::try_from(data.len()).err_tip(|| "Could not convert data.len() to u64")?;
712
5.20k
        let send_fut = async move {
713
            // Only send if we are not EOF.
714
5.20k
            if !data.is_empty() {
715
5.15k
                tx.send(data)
716
5.15k
                    .await
717
5.15k
                    .err_tip(|| "Failed to write data in update_oneshot")
?0
;
718
49
            }
719
5.20k
            tx.send_eof()
720
5.20k
                .err_tip(|| "Failed to write EOF in update_oneshot")
?0
;
721
5.20k
            Ok(())
722
5.20k
        };
723
        try_join!(
724
            send_fut,
725
            self.update(key, rx, UploadSizeInfo::ExactSize(data_len))
726
        )?;
727
        Ok(())
728
5.20k
    }
729
730
    /// See: [`StoreLike::get_part`] for details.
731
    async fn get_part(
732
        self: Pin<&Self>,
733
        key: StoreKey<'_>,
734
        writer: &mut DropCloserWriteHalf,
735
        offset: u64,
736
        length: Option<u64>,
737
    ) -> Result<(), Error>;
738
739
    /// See: [`StoreLike::get`] for details.
740
    #[inline]
741
    async fn get(
742
        self: Pin<&Self>,
743
        key: StoreKey<'_>,
744
        mut writer: DropCloserWriteHalf,
745
55
    ) -> Result<(), Error> {
746
        self.get_part(key, &mut writer, 0, None).await
747
55
    }
748
749
    /// See: [`StoreLike::get_part_unchunked`] for details.
750
    async fn get_part_unchunked(
751
        self: Pin<&Self>,
752
        key: StoreKey<'_>,
753
        offset: u64,
754
        length: Option<u64>,
755
5.50k
    ) -> Result<Bytes, Error> {
756
        let length_usize = length
757
2.31k
            .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize"))
758
            .transpose()?;
759
760
        // TODO(palfrey) This is extremely inefficient, since we have exactly
761
        // what we need here. Maybe we could instead make a version of the stream
762
        // that can take objects already fully in memory instead?
763
        let (mut tx, mut rx) = make_buf_channel_pair();
764
765
        let (data_res, get_part_res) = join!(
766
            rx.consume(length_usize),
767
            // We use a closure here to ensure that the `tx` is dropped when the
768
            // future is done.
769
5.50k
            async move { self.get_part(key, &mut tx, offset, length).await 
}5.50k
,
770
        );
771
        get_part_res
772
            .err_tip(|| "Failed to get_part in get_part_unchunked")
773
            .merge(data_res.err_tip(|| "Failed to read stream to completion in get_part_unchunked"))
774
5.50k
    }
775
776
    /// See: [`StoreLike::check_health`] for details.
777
1
    async fn check_health(self: Pin<&Self>, namespace: Cow<'static, str>) -> HealthStatus {
778
        let digest_data_size = default_digest_size_health_check();
779
        let mut digest_data = vec![0u8; digest_data_size];
780
781
        let mut namespace_hasher = StdHasher::new();
782
        namespace.hash(&mut namespace_hasher);
783
        self.get_name().hash(&mut namespace_hasher);
784
        let hash_seed = namespace_hasher.finish();
785
786
        // Fill the digest data with random data based on a stable
787
        // hash of the namespace and store name. Intention is to
788
        // have randomly filled data that is unique per store and
789
        // does not change between health checks. This is to ensure
790
        // we are not adding more data to store on each health check.
791
        let mut rng: StdRng = StdRng::seed_from_u64(hash_seed);
792
        rng.fill_bytes(&mut digest_data);
793
794
        let mut digest_hasher = default_digest_hasher_func().hasher();
795
        digest_hasher.update(&digest_data);
796
        let digest_data_len = digest_data.len() as u64;
797
        let digest_info = StoreKey::from(digest_hasher.finalize_digest());
798
799
        let digest_bytes = Bytes::copy_from_slice(&digest_data);
800
801
        if let Err(e) = self
802
            .update_oneshot(digest_info.borrow(), digest_bytes.clone())
803
            .await
804
        {
805
            warn!(?e, "check_health Store.update_oneshot() failed");
806
            return HealthStatus::new_failed(
807
                self.get_ref(),
808
                format!("Store.update_oneshot() failed: {e}").into(),
809
            );
810
        }
811
812
        match self.has(digest_info.borrow()).await {
813
            Ok(Some(s)) => {
814
                if s != digest_data_len {
815
                    return HealthStatus::new_failed(
816
                        self.get_ref(),
817
                        format!("Store.has() size mismatch {s} != {digest_data_len}").into(),
818
                    );
819
                }
820
            }
821
            Ok(None) => {
822
                return HealthStatus::new_failed(
823
                    self.get_ref(),
824
                    "Store.has() size not found".into(),
825
                );
826
            }
827
            Err(e) => {
828
                return HealthStatus::new_failed(
829
                    self.get_ref(),
830
                    format!("Store.has() failed: {e}").into(),
831
                );
832
            }
833
        }
834
835
        match self
836
            .get_part_unchunked(digest_info, 0, Some(digest_data_len))
837
            .await
838
        {
839
            Ok(b) => {
840
                if b != digest_bytes {
841
                    return HealthStatus::new_failed(
842
                        self.get_ref(),
843
                        "Store.get_part_unchunked() data mismatch".into(),
844
                    );
845
                }
846
            }
847
            Err(e) => {
848
                return HealthStatus::new_failed(
849
                    self.get_ref(),
850
                    format!("Store.get_part_unchunked() failed: {e}").into(),
851
                );
852
            }
853
        }
854
855
        HealthStatus::new_ok(self.get_ref(), "Successfully store health check".into())
856
1
    }
857
858
    /// See: [`Store::inner_store`] for details.
859
    fn inner_store(&self, _digest: Option<StoreKey<'_>>) -> &dyn StoreDriver;
860
861
    /// Returns an Any variation of whatever Self is.
862
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static);
863
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static>;
864
865
    // Register health checks used to monitor the store.
866
0
    fn register_health(self: Arc<Self>, _registry: &mut HealthRegistryBuilder) {}
867
868
    fn register_remove_callback(
869
        self: Arc<Self>,
870
        callback: Arc<dyn RemoveItemCallback>,
871
    ) -> Result<(), Error>;
872
}
873
874
// Callback to be called when a store deletes an item. This is used so
875
// compound stores can remove items from their internal state when their
876
// underlying stores remove items e.g. caches
877
pub trait RemoveItemCallback: Debug + Send + Sync {
878
    fn callback<'a>(
879
        &'a self,
880
        store_key: StoreKey<'a>,
881
    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
882
}
883
884
/// The instructions on how to decode a value from a Bytes & version into
885
/// the underlying type.
886
pub trait SchedulerStoreDecodeTo {
887
    type DecodeOutput;
888
    fn decode(version: i64, data: Bytes) -> Result<Self::DecodeOutput, Error>;
889
}
890
891
pub trait SchedulerSubscription: Send + Sync {
892
    fn changed(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
893
}
894
895
pub trait SchedulerSubscriptionManager: Send + Sync {
896
    type Subscription: SchedulerSubscription;
897
898
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
899
    where
900
        K: SchedulerStoreKeyProvider;
901
902
    fn is_reliable() -> bool;
903
}
904
905
/// The API surface for a scheduler store.
906
pub trait SchedulerStore: Send + Sync + 'static {
907
    type SubscriptionManager: SchedulerSubscriptionManager;
908
909
    /// Returns the subscription manager for the scheduler store.
910
    fn subscription_manager(
911
        &self,
912
    ) -> impl Future<Output = Result<Arc<Self::SubscriptionManager>, Error>> + Send;
913
914
    /// Updates or inserts an entry into the underlying store.
915
    /// Metadata about the key is attached to the compile-time type.
916
    /// If `StoreKeyProvider::Versioned` is `TrueValue`, the data will not
917
    /// be updated if the current version in the database does not match
918
    /// the version in the passed in data.
919
    /// No guarantees are made about when `Version` is `FalseValue`.
920
    /// Indexes are guaranteed to be updated atomically with the data.
921
    fn update_data<T>(
922
        &self,
923
        data: T,
924
        expiry: Option<Duration>,
925
    ) -> impl Future<Output = Result<Option<i64>, Error>> + Send
926
    where
927
        T: SchedulerStoreDataProvider
928
            + SchedulerStoreKeyProvider
929
            + SchedulerCurrentVersionProvider
930
            + Send;
931
932
    /// Searches for all keys in the store that match the given index prefix.
933
    fn search_by_index_prefix<K>(
934
        &self,
935
        index: K,
936
    ) -> impl Future<
937
        Output = Result<
938
            impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
939
            Error,
940
        >,
941
    > + Send
942
    where
943
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,
944
        <K as SchedulerStoreDecodeTo>::DecodeOutput: Send;
945
946
    /// Returns data for the provided key with the given version if
947
    /// `StoreKeyProvider::Versioned` is `TrueValue`.
948
    fn get_and_decode<K>(
949
        &self,
950
        key: K,
951
    ) -> impl Future<Output = Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>> + Send
952
    where
953
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send;
954
}
955
956
/// A type that is used to let the scheduler store know what
957
/// index is being requested.
958
pub trait SchedulerIndexProvider {
959
    /// Only keys inserted with this prefix will be indexed.
960
    const KEY_PREFIX: &'static str;
961
962
    /// The name of the index.
963
    const INDEX_NAME: &'static str;
964
965
    /// The sort key for the index (if any).
966
    const MAYBE_SORT_KEY: Option<&'static str> = None;
967
968
    /// If the data is versioned.
969
    type Versioned: BoolValue;
970
971
    /// The value of the index.
972
    fn index_value(&self) -> Cow<'_, str>;
973
}
974
975
/// Provides a key to lookup data in the store.
976
pub trait SchedulerStoreKeyProvider {
977
    /// If the data is versioned.
978
    type Versioned: BoolValue;
979
980
    /// Returns the key for the data.
981
    fn get_key(&self) -> StoreKey<'static>;
982
}
983
984
/// Provides data to be stored in the scheduler store.
985
pub trait SchedulerStoreDataProvider {
986
    /// Converts the data into bytes to be stored in the store.
987
    fn try_into_bytes(self) -> Result<Bytes, Error>;
988
989
    /// Returns the indexes for the data if any.
990
4
    fn get_indexes(&self) -> Result<Vec<(&'static str, Bytes)>, Error> {
991
4
        Ok(Vec::new())
992
4
    }
993
}
994
995
/// Provides the current version of the data in the store.
996
pub trait SchedulerCurrentVersionProvider {
997
    /// Returns the current version of the data in the store.
998
    fn current_version(&self) -> i64;
999
}
1000
1001
/// Default implementation for when we are not providing a version
1002
/// for the data.
1003
impl<T> SchedulerCurrentVersionProvider for T
1004
where
1005
    T: SchedulerStoreKeyProvider<Versioned = FalseValue>,
1006
{
1007
0
    fn current_version(&self) -> i64 {
1008
0
        0
1009
0
    }
1010
}
1011
1012
/// Compile time types for booleans.
1013
pub trait BoolValue {
1014
    const VALUE: bool;
1015
}
1016
/// Compile time check if something is false.
1017
pub trait IsFalse {}
1018
/// Compile time check if something is true.
1019
pub trait IsTrue {}
1020
1021
/// Compile time true value.
1022
#[derive(Debug, Clone, Copy)]
1023
pub struct TrueValue;
1024
impl BoolValue for TrueValue {
1025
    const VALUE: bool = true;
1026
}
1027
impl IsTrue for TrueValue {}
1028
1029
/// Compile time false value.
1030
#[derive(Debug, Clone, Copy)]
1031
pub struct FalseValue;
1032
impl BoolValue for FalseValue {
1033
    const VALUE: bool = false;
1034
}
1035
impl IsFalse for FalseValue {}