Coverage Report

Created: 2024-11-20 10:13

/build/source/nativelink-store/src/size_partitioning_store.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::pin::Pin;
16
use std::sync::Arc;
17
18
use async_trait::async_trait;
19
use nativelink_config::stores::SizePartitioningSpec;
20
use nativelink_error::{make_input_err, Error, ResultExt};
21
use nativelink_metric::MetricsComponent;
22
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
23
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
24
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo};
25
use tokio::join;
26
27
0
#[derive(MetricsComponent)]
28
pub struct SizePartitioningStore {
29
    #[metric(help = "Size to partition our data")]
30
    partition_size: u64,
31
    #[metric(group = "lower_store")]
32
    lower_store: Store,
33
    #[metric(group = "upper_store")]
34
    upper_store: Store,
35
}
36
37
impl SizePartitioningStore {
38
3
    pub fn new(spec: &SizePartitioningSpec, lower_store: Store, upper_store: Store) -> Arc<Self> {
39
3
        Arc::new(SizePartitioningStore {
40
3
            partition_size: spec.size,
41
3
            lower_store,
42
3
            upper_store,
43
3
        })
44
3
    }
45
}
46
47
#[async_trait]
48
impl StoreDriver for SizePartitioningStore {
49
    async fn has_with_results(
50
        self: Pin<&Self>,
51
        keys: &[StoreKey<'_>],
52
        results: &mut [Option<u64>],
53
2
    ) -> Result<(), Error> {
54
2
        let mut non_digest_sample = None;
55
2
        let (lower_digests, upper_digests): (Vec<_>, Vec<_>) =
56
2
            keys.iter().map(StoreKey::borrow).partition(|k| {
57
2
                let StoreKey::Digest(digest) = k else {
  Branch (57:21): [True: 2, False: 0]
  Branch (57:21): [Folded - Ignored]
58
0
                    non_digest_sample = Some(k.borrow().into_owned());
59
0
                    return false;
60
                };
61
2
                digest.size_bytes() < self.partition_size
62
2
            });
63
2
        if let Some(
non_digest0
) = non_digest_sample {
  Branch (63:16): [True: 0, False: 2]
  Branch (63:16): [Folded - Ignored]
64
0
            return Err(make_input_err!(
65
0
                "SizePartitioningStore only supports Digest keys, got {non_digest:?}"
66
0
            ));
67
2
        }
68
2
        let (lower_results, upper_results) = join!(
69
2
            self.lower_store.has_many(&lower_digests),
70
2
            self.upper_store.has_many(&upper_digests),
71
2
        );
72
2
        let mut lower_results = match lower_results {
73
2
            Ok(lower_results) => lower_results.into_iter(),
74
0
            Err(err) => match upper_results {
75
0
                Ok(_) => return Err(err),
76
0
                Err(upper_err) => return Err(err.merge(upper_err)),
77
            },
78
        };
79
2
        let mut upper_digests = upper_digests.into_iter().peekable();
80
2
        let mut upper_results = upper_results
?0
.into_iter();
81
2
        for (digest, result) in keys.iter().zip(results.iter_mut()) {
82
2
            if Some(digest) == upper_digests.peek() {
  Branch (82:16): [True: 1, False: 1]
  Branch (82:16): [Folded - Ignored]
83
1
                upper_digests.next();
84
1
                *result = upper_results
85
1
                    .next()
86
1
                    .err_tip(|| 
"upper_results out of sync with upper_digests"0
)
?0
;
87
            } else {
88
1
                *result = lower_results
89
1
                    .next()
90
1
                    .err_tip(|| 
"lower_results out of sync with lower_digests"0
)
?0
;
91
            }
92
        }
93
2
        Ok(())
94
4
    }
95
96
    async fn update(
97
        self: Pin<&Self>,
98
        key: StoreKey<'_>,
99
        reader: DropCloserReadHalf,
100
        size_info: UploadSizeInfo,
101
2
    ) -> Result<(), Error> {
102
2
        let digest = match key {
103
2
            StoreKey::Digest(digest) => digest,
104
0
            other @ StoreKey::Str(_) => {
105
0
                return Err(make_input_err!(
106
0
                    "SizePartitioningStore only supports Digest keys, got {other:?}"
107
0
                ))
108
            }
109
        };
110
2
        if digest.size_bytes() < self.partition_size {
  Branch (110:12): [True: 1, False: 1]
  Branch (110:12): [Folded - Ignored]
111
1
            return self.lower_store.update(digest, reader, size_info).
await0
;
112
1
        }
113
1
        self.upper_store.update(digest, reader, size_info).
await0
114
4
    }
115
116
    async fn get_part(
117
        self: Pin<&Self>,
118
        key: StoreKey<'_>,
119
        writer: &mut DropCloserWriteHalf,
120
        offset: u64,
121
        length: Option<u64>,
122
2
    ) -> Result<(), Error> {
123
2
        let digest = match key {
124
2
            StoreKey::Digest(digest) => digest,
125
0
            other @ StoreKey::Str(_) => {
126
0
                return Err(make_input_err!(
127
0
                    "SizePartitioningStore only supports Digest keys, got {other:?}"
128
0
                ))
129
            }
130
        };
131
2
        if digest.size_bytes() < self.partition_size {
  Branch (131:12): [True: 1, False: 1]
  Branch (131:12): [Folded - Ignored]
132
1
            return self
133
1
                .lower_store
134
1
                .get_part(digest, writer, offset, length)
135
0
                .await;
136
1
        }
137
1
        self.upper_store
138
1
            .get_part(digest, writer, offset, length)
139
0
            .await
140
4
    }
141
142
0
    fn inner_store(&self, key: Option<StoreKey>) -> &'_ dyn StoreDriver {
143
0
        let Some(key) = key else {
  Branch (143:13): [True: 0, False: 0]
  Branch (143:13): [Folded - Ignored]
144
0
            return self;
145
        };
146
0
        let StoreKey::Digest(digest) = key else {
  Branch (146:13): [True: 0, False: 0]
  Branch (146:13): [Folded - Ignored]
147
0
            return self;
148
        };
149
0
        if digest.size_bytes() < self.partition_size {
  Branch (149:12): [True: 0, False: 0]
  Branch (149:12): [Folded - Ignored]
150
0
            return self.lower_store.inner_store(Some(digest));
151
0
        }
152
0
        self.upper_store.inner_store(Some(digest))
153
0
    }
154
155
0
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
156
0
        self
157
0
    }
158
159
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
160
0
        self
161
0
    }
162
}
163
164
default_health_status_indicator!(SizePartitioningStore);