Coverage Report

Created: 2026-06-25 00:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/size_partitioning_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use std::sync::Arc;
17
18
use async_trait::async_trait;
19
use futures::try_join;
20
use nativelink_config::stores::SizePartitioningSpec;
21
use nativelink_error::{Error, ResultExt, make_input_err};
22
use nativelink_metric::MetricsComponent;
23
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
24
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
25
use nativelink_util::store_trait::{
26
    RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo,
27
};
28
use tokio::join;
29
30
#[derive(Debug, MetricsComponent)]
31
pub struct SizePartitioningStore {
32
    #[metric(help = "Size to partition our data")]
33
    partition_size: u64,
34
    #[metric(group = "lower_store")]
35
    lower_store: Store,
36
    #[metric(group = "upper_store")]
37
    upper_store: Store,
38
}
39
40
impl SizePartitioningStore {
41
3
    pub fn new(spec: &SizePartitioningSpec, lower_store: Store, upper_store: Store) -> Arc<Self> {
42
3
        Arc::new(Self {
43
3
            partition_size: spec.size,
44
3
            lower_store,
45
3
            upper_store,
46
3
        })
47
3
    }
48
}
49
50
#[async_trait]
51
impl StoreDriver for SizePartitioningStore {
52
0
    async fn post_init(self: Arc<Self>) -> Result<(), Error> {
53
        try_join!(
54
            self.upper_store.clone().into_inner().post_init(),
55
            self.lower_store.clone().into_inner().post_init(),
56
        )?;
57
        Ok(())
58
0
    }
59
60
    async fn has_with_results(
61
        self: Pin<&Self>,
62
        keys: &[StoreKey<'_>],
63
        results: &mut [Option<u64>],
64
2
    ) -> Result<(), Error> {
65
        let mut non_digest_sample = None;
66
        let (lower_digests, upper_digests): (Vec<_>, Vec<_>) =
67
2
            keys.iter().map(StoreKey::borrow).partition(|k| {
68
2
                let StoreKey::Digest(digest) = k else {
69
0
                    non_digest_sample = Some(k.borrow().into_owned());
70
0
                    return false;
71
                };
72
2
                digest.size_bytes() < self.partition_size
73
2
            });
74
        if let Some(non_digest) = non_digest_sample {
75
            return Err(make_input_err!(
76
                "SizePartitioningStore only supports Digest keys, got {non_digest:?}"
77
            ));
78
        }
79
        let (lower_results, upper_results) = join!(
80
            self.lower_store.has_many(&lower_digests),
81
            self.upper_store.has_many(&upper_digests),
82
        );
83
        let mut lower_results = match lower_results {
84
            Ok(lower_results) => lower_results.into_iter(),
85
            Err(err) => match upper_results {
86
                Ok(_) => return Err(err),
87
                Err(upper_err) => return Err(err.merge(upper_err)),
88
            },
89
        };
90
        let mut upper_digests = upper_digests.into_iter().peekable();
91
        let mut upper_results = upper_results?.into_iter();
92
        for (digest, result) in keys.iter().zip(results.iter_mut()) {
93
            if Some(digest) == upper_digests.peek() {
94
                upper_digests.next();
95
                *result = upper_results
96
                    .next()
97
                    .err_tip(|| "upper_results out of sync with upper_digests")?;
98
            } else {
99
                *result = lower_results
100
                    .next()
101
                    .err_tip(|| "lower_results out of sync with lower_digests")?;
102
            }
103
        }
104
        Ok(())
105
2
    }
106
107
    async fn update(
108
        self: Pin<&Self>,
109
        key: StoreKey<'_>,
110
        reader: DropCloserReadHalf,
111
        size_info: UploadSizeInfo,
112
2
    ) -> Result<u64, Error> {
113
        let digest = match key {
114
            StoreKey::Digest(digest) => digest,
115
            other @ StoreKey::Str(_) => {
116
                return Err(make_input_err!(
117
                    "SizePartitioningStore only supports Digest keys, got {other:?}"
118
                ));
119
            }
120
        };
121
        if digest.size_bytes() < self.partition_size {
122
            return self.lower_store.update(digest, reader, size_info).await;
123
        }
124
        self.upper_store.update(digest, reader, size_info).await
125
2
    }
126
127
    async fn get_part(
128
        self: Pin<&Self>,
129
        key: StoreKey<'_>,
130
        writer: &mut DropCloserWriteHalf,
131
        offset: u64,
132
        length: Option<u64>,
133
2
    ) -> Result<(), Error> {
134
        let digest = match key {
135
            StoreKey::Digest(digest) => digest,
136
            other @ StoreKey::Str(_) => {
137
                return Err(make_input_err!(
138
                    "SizePartitioningStore only supports Digest keys, got {other:?}"
139
                ));
140
            }
141
        };
142
        if digest.size_bytes() < self.partition_size {
143
            return self
144
                .lower_store
145
                .get_part(digest, writer, offset, length)
146
                .await;
147
        }
148
        self.upper_store
149
            .get_part(digest, writer, offset, length)
150
            .await
151
2
    }
152
153
0
    fn inner_store(&self, key: Option<StoreKey>) -> &'_ dyn StoreDriver {
154
0
        let Some(key) = key else {
155
0
            return self;
156
        };
157
0
        let StoreKey::Digest(digest) = key else {
158
0
            return self;
159
        };
160
0
        if digest.size_bytes() < self.partition_size {
161
0
            return self.lower_store.inner_store(Some(digest));
162
0
        }
163
0
        self.upper_store.inner_store(Some(digest))
164
0
    }
165
166
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
167
0
        self
168
0
    }
169
170
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
171
0
        self
172
0
    }
173
174
0
    fn register_remove_callback(
175
0
        self: Arc<Self>,
176
0
        callback: Arc<dyn RemoveItemCallback>,
177
0
    ) -> Result<(), Error> {
178
0
        self.lower_store
179
0
            .register_remove_callback(callback.clone())?;
180
0
        self.upper_store.register_remove_callback(callback)?;
181
0
        Ok(())
182
0
    }
183
}
184
185
default_health_status_indicator!(SizePartitioningStore);