Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-store/src/size_partitioning_store.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::pin::Pin;
16
use std::sync::Arc;
17
18
use async_trait::async_trait;
19
use nativelink_error::{make_input_err, Error, ResultExt};
20
use nativelink_metric::MetricsComponent;
21
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
22
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
23
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo};
24
use tokio::join;
25
26
0
#[derive(MetricsComponent)]
27
pub struct SizePartitioningStore {
28
    #[metric(help = "Size to partition our data")]
29
    partition_size: u64,
30
    #[metric(group = "lower_store")]
31
    lower_store: Store,
32
    #[metric(group = "upper_store")]
33
    upper_store: Store,
34
}
35
36
impl SizePartitioningStore {
37
3
    pub fn new(
38
3
        config: &nativelink_config::stores::SizePartitioningStore,
39
3
        lower_store: Store,
40
3
        upper_store: Store,
41
3
    ) -> Arc<Self> {
42
3
        Arc::new(SizePartitioningStore {
43
3
            partition_size: config.size,
44
3
            lower_store,
45
3
            upper_store,
46
3
        })
47
3
    }
48
}
49
50
#[async_trait]
51
impl StoreDriver for SizePartitioningStore {
52
    async fn has_with_results(
53
        self: Pin<&Self>,
54
        keys: &[StoreKey<'_>],
55
        results: &mut [Option<u64>],
56
2
    ) -> Result<(), Error> {
57
2
        let mut non_digest_sample = None;
58
2
        let (lower_digests, upper_digests): (Vec<_>, Vec<_>) =
59
2
            keys.iter().map(StoreKey::borrow).partition(|k| {
60
2
                let StoreKey::Digest(digest) = k else {
  Branch (60:21): [True: 2, False: 0]
  Branch (60:21): [Folded - Ignored]
61
0
                    non_digest_sample = Some(k.borrow().into_owned());
62
0
                    return false;
63
                };
64
2
                digest.size_bytes() < self.partition_size
65
2
            });
66
2
        if let Some(
non_digest0
) = non_digest_sample {
  Branch (66:16): [True: 0, False: 2]
  Branch (66:16): [Folded - Ignored]
67
0
            return Err(make_input_err!(
68
0
                "SizePartitioningStore only supports Digest keys, got {non_digest:?}"
69
0
            ));
70
2
        }
71
2
        let (lower_results, upper_results) = join!(
72
2
            self.lower_store.has_many(&lower_digests),
73
2
            self.upper_store.has_many(&upper_digests),
74
2
        );
75
2
        let mut lower_results = match lower_results {
76
2
            Ok(lower_results) => lower_results.into_iter(),
77
0
            Err(err) => match upper_results {
78
0
                Ok(_) => return Err(err),
79
0
                Err(upper_err) => return Err(err.merge(upper_err)),
80
            },
81
        };
82
2
        let mut upper_digests = upper_digests.into_iter().peekable();
83
2
        let mut upper_results = upper_results
?0
.into_iter();
84
2
        for (digest, result) in keys.iter().zip(results.iter_mut()) {
85
2
            if Some(digest) == upper_digests.peek() {
  Branch (85:16): [True: 1, False: 1]
  Branch (85:16): [Folded - Ignored]
86
1
                upper_digests.next();
87
1
                *result = upper_results
88
1
                    .next()
89
1
                    .err_tip(|| 
"upper_results out of sync with upper_digests"0
)
?0
;
90
            } else {
91
1
                *result = lower_results
92
1
                    .next()
93
1
                    .err_tip(|| 
"lower_results out of sync with lower_digests"0
)
?0
;
94
            }
95
        }
96
2
        Ok(())
97
4
    }
98
99
    async fn update(
100
        self: Pin<&Self>,
101
        key: StoreKey<'_>,
102
        reader: DropCloserReadHalf,
103
        size_info: UploadSizeInfo,
104
2
    ) -> Result<(), Error> {
105
2
        let digest = match key {
106
2
            StoreKey::Digest(digest) => digest,
107
0
            other @ StoreKey::Str(_) => {
108
0
                return Err(make_input_err!(
109
0
                    "SizePartitioningStore only supports Digest keys, got {other:?}"
110
0
                ))
111
            }
112
        };
113
2
        if digest.size_bytes() < self.partition_size {
  Branch (113:12): [True: 1, False: 1]
  Branch (113:12): [Folded - Ignored]
114
1
            return self.lower_store.update(digest, reader, size_info).
await0
;
115
1
        }
116
1
        self.upper_store.update(digest, reader, size_info).
await0
117
4
    }
118
119
    async fn get_part(
120
        self: Pin<&Self>,
121
        key: StoreKey<'_>,
122
        writer: &mut DropCloserWriteHalf,
123
        offset: u64,
124
        length: Option<u64>,
125
2
    ) -> Result<(), Error> {
126
2
        let digest = match key {
127
2
            StoreKey::Digest(digest) => digest,
128
0
            other @ StoreKey::Str(_) => {
129
0
                return Err(make_input_err!(
130
0
                    "SizePartitioningStore only supports Digest keys, got {other:?}"
131
0
                ))
132
            }
133
        };
134
2
        if digest.size_bytes() < self.partition_size {
  Branch (134:12): [True: 1, False: 1]
  Branch (134:12): [Folded - Ignored]
135
1
            return self
136
1
                .lower_store
137
1
                .get_part(digest, writer, offset, length)
138
0
                .await;
139
1
        }
140
1
        self.upper_store
141
1
            .get_part(digest, writer, offset, length)
142
0
            .await
143
4
    }
144
145
0
    fn inner_store(&self, key: Option<StoreKey>) -> &'_ dyn StoreDriver {
146
0
        let Some(key) = key else {
  Branch (146:13): [True: 0, False: 0]
  Branch (146:13): [Folded - Ignored]
147
0
            return self;
148
        };
149
0
        let StoreKey::Digest(digest) = key else {
  Branch (149:13): [True: 0, False: 0]
  Branch (149:13): [Folded - Ignored]
150
0
            return self;
151
        };
152
0
        if digest.size_bytes() < self.partition_size {
  Branch (152:12): [True: 0, False: 0]
  Branch (152:12): [Folded - Ignored]
153
0
            return self.lower_store.inner_store(Some(digest));
154
0
        }
155
0
        self.upper_store.inner_store(Some(digest))
156
0
    }
157
158
0
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
159
0
        self
160
0
    }
161
162
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
163
0
        self
164
0
    }
165
}
166
167
default_health_status_indicator!(SizePartitioningStore);