/build/source/nativelink-store/src/size_partitioning_store.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::pin::Pin; |
16 | | use std::sync::Arc; |
17 | | |
18 | | use async_trait::async_trait; |
19 | | use nativelink_error::{make_input_err, Error, ResultExt}; |
20 | | use nativelink_metric::MetricsComponent; |
21 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
22 | | use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; |
23 | | use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo}; |
24 | | use tokio::join; |
25 | | |
26 | 0 | #[derive(MetricsComponent)] |
27 | | pub struct SizePartitioningStore { |
28 | | #[metric(help = "Size to partition our data")] |
29 | | partition_size: u64, |
30 | | #[metric(group = "lower_store")] |
31 | | lower_store: Store, |
32 | | #[metric(group = "upper_store")] |
33 | | upper_store: Store, |
34 | | } |
35 | | |
36 | | impl SizePartitioningStore { |
37 | 3 | pub fn new( |
38 | 3 | config: &nativelink_config::stores::SizePartitioningStore, |
39 | 3 | lower_store: Store, |
40 | 3 | upper_store: Store, |
41 | 3 | ) -> Arc<Self> { |
42 | 3 | Arc::new(SizePartitioningStore { |
43 | 3 | partition_size: config.size, |
44 | 3 | lower_store, |
45 | 3 | upper_store, |
46 | 3 | }) |
47 | 3 | } |
48 | | } |
49 | | |
50 | | #[async_trait] |
51 | | impl StoreDriver for SizePartitioningStore { |
52 | | async fn has_with_results( |
53 | | self: Pin<&Self>, |
54 | | keys: &[StoreKey<'_>], |
55 | | results: &mut [Option<u64>], |
56 | 2 | ) -> Result<(), Error> { |
57 | 2 | let mut non_digest_sample = None; |
58 | 2 | let (lower_digests, upper_digests): (Vec<_>, Vec<_>) = |
59 | 2 | keys.iter().map(StoreKey::borrow).partition(|k| { |
60 | 2 | let StoreKey::Digest(digest) = k else { Branch (60:21): [True: 2, False: 0]
Branch (60:21): [Folded - Ignored]
|
61 | 0 | non_digest_sample = Some(k.borrow().into_owned()); |
62 | 0 | return false; |
63 | | }; |
64 | 2 | digest.size_bytes() < self.partition_size |
65 | 2 | }); |
66 | 2 | if let Some(non_digest0 ) = non_digest_sample { Branch (66:16): [True: 0, False: 2]
Branch (66:16): [Folded - Ignored]
|
67 | 0 | return Err(make_input_err!( |
68 | 0 | "SizePartitioningStore only supports Digest keys, got {non_digest:?}" |
69 | 0 | )); |
70 | 2 | } |
71 | 2 | let (lower_results, upper_results) = join!( |
72 | 2 | self.lower_store.has_many(&lower_digests), |
73 | 2 | self.upper_store.has_many(&upper_digests), |
74 | 2 | ); |
75 | 2 | let mut lower_results = match lower_results { |
76 | 2 | Ok(lower_results) => lower_results.into_iter(), |
77 | 0 | Err(err) => match upper_results { |
78 | 0 | Ok(_) => return Err(err), |
79 | 0 | Err(upper_err) => return Err(err.merge(upper_err)), |
80 | | }, |
81 | | }; |
82 | 2 | let mut upper_digests = upper_digests.into_iter().peekable(); |
83 | 2 | let mut upper_results = upper_results?0 .into_iter(); |
84 | 2 | for (digest, result) in keys.iter().zip(results.iter_mut()) { |
85 | 2 | if Some(digest) == upper_digests.peek() { Branch (85:16): [True: 1, False: 1]
Branch (85:16): [Folded - Ignored]
|
86 | 1 | upper_digests.next(); |
87 | 1 | *result = upper_results |
88 | 1 | .next() |
89 | 1 | .err_tip(|| "upper_results out of sync with upper_digests"0 )?0 ; |
90 | | } else { |
91 | 1 | *result = lower_results |
92 | 1 | .next() |
93 | 1 | .err_tip(|| "lower_results out of sync with lower_digests"0 )?0 ; |
94 | | } |
95 | | } |
96 | 2 | Ok(()) |
97 | 4 | } |
98 | | |
99 | | async fn update( |
100 | | self: Pin<&Self>, |
101 | | key: StoreKey<'_>, |
102 | | reader: DropCloserReadHalf, |
103 | | size_info: UploadSizeInfo, |
104 | 2 | ) -> Result<(), Error> { |
105 | 2 | let digest = match key { |
106 | 2 | StoreKey::Digest(digest) => digest, |
107 | 0 | other @ StoreKey::Str(_) => { |
108 | 0 | return Err(make_input_err!( |
109 | 0 | "SizePartitioningStore only supports Digest keys, got {other:?}" |
110 | 0 | )) |
111 | | } |
112 | | }; |
113 | 2 | if digest.size_bytes() < self.partition_size { Branch (113:12): [True: 1, False: 1]
Branch (113:12): [Folded - Ignored]
|
114 | 1 | return self.lower_store.update(digest, reader, size_info).await0 ; |
115 | 1 | } |
116 | 1 | self.upper_store.update(digest, reader, size_info).await0 |
117 | 4 | } |
118 | | |
119 | | async fn get_part( |
120 | | self: Pin<&Self>, |
121 | | key: StoreKey<'_>, |
122 | | writer: &mut DropCloserWriteHalf, |
123 | | offset: u64, |
124 | | length: Option<u64>, |
125 | 2 | ) -> Result<(), Error> { |
126 | 2 | let digest = match key { |
127 | 2 | StoreKey::Digest(digest) => digest, |
128 | 0 | other @ StoreKey::Str(_) => { |
129 | 0 | return Err(make_input_err!( |
130 | 0 | "SizePartitioningStore only supports Digest keys, got {other:?}" |
131 | 0 | )) |
132 | | } |
133 | | }; |
134 | 2 | if digest.size_bytes() < self.partition_size { Branch (134:12): [True: 1, False: 1]
Branch (134:12): [Folded - Ignored]
|
135 | 1 | return self |
136 | 1 | .lower_store |
137 | 1 | .get_part(digest, writer, offset, length) |
138 | 0 | .await; |
139 | 1 | } |
140 | 1 | self.upper_store |
141 | 1 | .get_part(digest, writer, offset, length) |
142 | 0 | .await |
143 | 4 | } |
144 | | |
145 | 0 | fn inner_store(&self, key: Option<StoreKey>) -> &'_ dyn StoreDriver { |
146 | 0 | let Some(key) = key else { Branch (146:13): [True: 0, False: 0]
Branch (146:13): [Folded - Ignored]
|
147 | 0 | return self; |
148 | | }; |
149 | 0 | let StoreKey::Digest(digest) = key else { Branch (149:13): [True: 0, False: 0]
Branch (149:13): [Folded - Ignored]
|
150 | 0 | return self; |
151 | | }; |
152 | 0 | if digest.size_bytes() < self.partition_size { Branch (152:12): [True: 0, False: 0]
Branch (152:12): [Folded - Ignored]
|
153 | 0 | return self.lower_store.inner_store(Some(digest)); |
154 | 0 | } |
155 | 0 | self.upper_store.inner_store(Some(digest)) |
156 | 0 | } |
157 | | |
158 | 0 | fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) { |
159 | 0 | self |
160 | 0 | } |
161 | | |
162 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> { |
163 | 0 | self |
164 | 0 | } |
165 | | } |
166 | | |
167 | | default_health_status_indicator!(SizePartitioningStore); |