Coverage Report

Created: 2025-10-24 14:08

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/size_partitioning_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use std::sync::Arc;
17
18
use async_trait::async_trait;
19
use nativelink_config::stores::SizePartitioningSpec;
20
use nativelink_error::{Error, ResultExt, make_input_err};
21
use nativelink_metric::MetricsComponent;
22
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
23
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
24
use nativelink_util::store_trait::{
25
    RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo,
26
};
27
use tokio::join;
28
29
#[derive(Debug, MetricsComponent)]
30
pub struct SizePartitioningStore {
31
    #[metric(help = "Size to partition our data")]
32
    partition_size: u64,
33
    #[metric(group = "lower_store")]
34
    lower_store: Store,
35
    #[metric(group = "upper_store")]
36
    upper_store: Store,
37
}
38
39
impl SizePartitioningStore {
40
3
    pub fn new(spec: &SizePartitioningSpec, lower_store: Store, upper_store: Store) -> Arc<Self> {
41
3
        Arc::new(Self {
42
3
            partition_size: spec.size,
43
3
            lower_store,
44
3
            upper_store,
45
3
        })
46
3
    }
47
}
48
49
#[async_trait]
50
impl StoreDriver for SizePartitioningStore {
51
    async fn has_with_results(
52
        self: Pin<&Self>,
53
        keys: &[StoreKey<'_>],
54
        results: &mut [Option<u64>],
55
2
    ) -> Result<(), Error> {
56
        let mut non_digest_sample = None;
57
        let (lower_digests, upper_digests): (Vec<_>, Vec<_>) =
58
2
            keys.iter().map(StoreKey::borrow).partition(|k| {
59
2
                let StoreKey::Digest(digest) = k else {
  Branch (59:21): [True: 2, False: 0]
  Branch (59:21): [Folded - Ignored]
60
0
                    non_digest_sample = Some(k.borrow().into_owned());
61
0
                    return false;
62
                };
63
2
                digest.size_bytes() < self.partition_size
64
2
            });
65
        if let Some(non_digest) = non_digest_sample {
66
            return Err(make_input_err!(
67
                "SizePartitioningStore only supports Digest keys, got {non_digest:?}"
68
            ));
69
        }
70
        let (lower_results, upper_results) = join!(
71
            self.lower_store.has_many(&lower_digests),
72
            self.upper_store.has_many(&upper_digests),
73
        );
74
        let mut lower_results = match lower_results {
75
            Ok(lower_results) => lower_results.into_iter(),
76
            Err(err) => match upper_results {
77
                Ok(_) => return Err(err),
78
                Err(upper_err) => return Err(err.merge(upper_err)),
79
            },
80
        };
81
        let mut upper_digests = upper_digests.into_iter().peekable();
82
        let mut upper_results = upper_results?.into_iter();
83
        for (digest, result) in keys.iter().zip(results.iter_mut()) {
84
            if Some(digest) == upper_digests.peek() {
85
                upper_digests.next();
86
                *result = upper_results
87
                    .next()
88
                    .err_tip(|| "upper_results out of sync with upper_digests")?;
89
            } else {
90
                *result = lower_results
91
                    .next()
92
                    .err_tip(|| "lower_results out of sync with lower_digests")?;
93
            }
94
        }
95
        Ok(())
96
2
    }
97
98
    async fn update(
99
        self: Pin<&Self>,
100
        key: StoreKey<'_>,
101
        reader: DropCloserReadHalf,
102
        size_info: UploadSizeInfo,
103
2
    ) -> Result<(), Error> {
104
        let digest = match key {
105
            StoreKey::Digest(digest) => digest,
106
            other @ StoreKey::Str(_) => {
107
                return Err(make_input_err!(
108
                    "SizePartitioningStore only supports Digest keys, got {other:?}"
109
                ));
110
            }
111
        };
112
        if digest.size_bytes() < self.partition_size {
113
            return self.lower_store.update(digest, reader, size_info).await;
114
        }
115
        self.upper_store.update(digest, reader, size_info).await
116
2
    }
117
118
    async fn get_part(
119
        self: Pin<&Self>,
120
        key: StoreKey<'_>,
121
        writer: &mut DropCloserWriteHalf,
122
        offset: u64,
123
        length: Option<u64>,
124
2
    ) -> Result<(), Error> {
125
        let digest = match key {
126
            StoreKey::Digest(digest) => digest,
127
            other @ StoreKey::Str(_) => {
128
                return Err(make_input_err!(
129
                    "SizePartitioningStore only supports Digest keys, got {other:?}"
130
                ));
131
            }
132
        };
133
        if digest.size_bytes() < self.partition_size {
134
            return self
135
                .lower_store
136
                .get_part(digest, writer, offset, length)
137
                .await;
138
        }
139
        self.upper_store
140
            .get_part(digest, writer, offset, length)
141
            .await
142
2
    }
143
144
0
    fn inner_store(&self, key: Option<StoreKey>) -> &'_ dyn StoreDriver {
145
0
        let Some(key) = key else {
  Branch (145:13): [True: 0, False: 0]
  Branch (145:13): [Folded - Ignored]
146
0
            return self;
147
        };
148
0
        let StoreKey::Digest(digest) = key else {
  Branch (148:13): [True: 0, False: 0]
  Branch (148:13): [Folded - Ignored]
149
0
            return self;
150
        };
151
0
        if digest.size_bytes() < self.partition_size {
  Branch (151:12): [True: 0, False: 0]
  Branch (151:12): [Folded - Ignored]
152
0
            return self.lower_store.inner_store(Some(digest));
153
0
        }
154
0
        self.upper_store.inner_store(Some(digest))
155
0
    }
156
157
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
158
0
        self
159
0
    }
160
161
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
162
0
        self
163
0
    }
164
165
0
    fn register_remove_callback(
166
0
        self: Arc<Self>,
167
0
        callback: Arc<dyn RemoveItemCallback>,
168
0
    ) -> Result<(), Error> {
169
0
        self.lower_store
170
0
            .register_remove_callback(callback.clone())?;
171
0
        self.upper_store.register_remove_callback(callback)?;
172
0
        Ok(())
173
0
    }
174
}
175
176
default_health_status_indicator!(SizePartitioningStore);