/build/source/nativelink-store/src/size_partitioning_store.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::pin::Pin; |
16 | | use std::sync::Arc; |
17 | | |
18 | | use async_trait::async_trait; |
19 | | use nativelink_config::stores::SizePartitioningSpec; |
20 | | use nativelink_error::{make_input_err, Error, ResultExt}; |
21 | | use nativelink_metric::MetricsComponent; |
22 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
23 | | use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; |
24 | | use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo}; |
25 | | use tokio::join; |
26 | | |
27 | | #[derive(MetricsComponent)] |
28 | | pub struct SizePartitioningStore { |
29 | | #[metric(help = "Size to partition our data")] |
30 | | partition_size: u64, |
31 | | #[metric(group = "lower_store")] |
32 | | lower_store: Store, |
33 | | #[metric(group = "upper_store")] |
34 | | upper_store: Store, |
35 | | } |
36 | | |
37 | | impl SizePartitioningStore { |
38 | 3 | pub fn new(spec: &SizePartitioningSpec, lower_store: Store, upper_store: Store) -> Arc<Self> { |
39 | 3 | Arc::new(SizePartitioningStore { |
40 | 3 | partition_size: spec.size, |
41 | 3 | lower_store, |
42 | 3 | upper_store, |
43 | 3 | }) |
44 | 3 | } |
45 | | } |
46 | | |
47 | | #[async_trait] |
48 | | impl StoreDriver for SizePartitioningStore { |
49 | | async fn has_with_results( |
50 | | self: Pin<&Self>, |
51 | | keys: &[StoreKey<'_>], |
52 | | results: &mut [Option<u64>], |
53 | 2 | ) -> Result<(), Error> { |
54 | 2 | let mut non_digest_sample = None; |
55 | 2 | let (lower_digests, upper_digests): (Vec<_>, Vec<_>) = |
56 | 2 | keys.iter().map(StoreKey::borrow).partition(|k| { |
57 | 2 | let StoreKey::Digest(digest) = k else { Branch (57:21): [True: 2, False: 0]
Branch (57:21): [Folded - Ignored]
|
58 | 0 | non_digest_sample = Some(k.borrow().into_owned()); |
59 | 0 | return false; |
60 | | }; |
61 | 2 | digest.size_bytes() < self.partition_size |
62 | 2 | }); |
63 | 2 | if let Some(non_digest0 ) = non_digest_sample { Branch (63:16): [True: 0, False: 2]
Branch (63:16): [Folded - Ignored]
|
64 | 0 | return Err(make_input_err!( |
65 | 0 | "SizePartitioningStore only supports Digest keys, got {non_digest:?}" |
66 | 0 | )); |
67 | 2 | } |
68 | 2 | let (lower_results, upper_results) = join!( |
69 | 2 | self.lower_store.has_many(&lower_digests), |
70 | 2 | self.upper_store.has_many(&upper_digests), |
71 | 2 | ); |
72 | 2 | let mut lower_results = match lower_results { |
73 | 2 | Ok(lower_results) => lower_results.into_iter(), |
74 | 0 | Err(err) => match upper_results { |
75 | 0 | Ok(_) => return Err(err), |
76 | 0 | Err(upper_err) => return Err(err.merge(upper_err)), |
77 | | }, |
78 | | }; |
79 | 2 | let mut upper_digests = upper_digests.into_iter().peekable(); |
80 | 2 | let mut upper_results = upper_results?0 .into_iter(); |
81 | 2 | for (digest, result) in keys.iter().zip(results.iter_mut()) { |
82 | 2 | if Some(digest) == upper_digests.peek() { Branch (82:16): [True: 1, False: 1]
Branch (82:16): [Folded - Ignored]
|
83 | 1 | upper_digests.next(); |
84 | 1 | *result = upper_results |
85 | 1 | .next() |
86 | 1 | .err_tip(|| "upper_results out of sync with upper_digests"0 )?0 ; |
87 | | } else { |
88 | 1 | *result = lower_results |
89 | 1 | .next() |
90 | 1 | .err_tip(|| "lower_results out of sync with lower_digests"0 )?0 ; |
91 | | } |
92 | | } |
93 | 2 | Ok(()) |
94 | 4 | } |
95 | | |
96 | | async fn update( |
97 | | self: Pin<&Self>, |
98 | | key: StoreKey<'_>, |
99 | | reader: DropCloserReadHalf, |
100 | | size_info: UploadSizeInfo, |
101 | 2 | ) -> Result<(), Error> { |
102 | 2 | let digest = match key { |
103 | 2 | StoreKey::Digest(digest) => digest, |
104 | 0 | other @ StoreKey::Str(_) => { |
105 | 0 | return Err(make_input_err!( |
106 | 0 | "SizePartitioningStore only supports Digest keys, got {other:?}" |
107 | 0 | )) |
108 | | } |
109 | | }; |
110 | 2 | if digest.size_bytes() < self.partition_size { Branch (110:12): [True: 1, False: 1]
Branch (110:12): [Folded - Ignored]
|
111 | 1 | return self.lower_store.update(digest, reader, size_info).await; |
112 | 1 | } |
113 | 1 | self.upper_store.update(digest, reader, size_info).await |
114 | 4 | } |
115 | | |
116 | | async fn get_part( |
117 | | self: Pin<&Self>, |
118 | | key: StoreKey<'_>, |
119 | | writer: &mut DropCloserWriteHalf, |
120 | | offset: u64, |
121 | | length: Option<u64>, |
122 | 2 | ) -> Result<(), Error> { |
123 | 2 | let digest = match key { |
124 | 2 | StoreKey::Digest(digest) => digest, |
125 | 0 | other @ StoreKey::Str(_) => { |
126 | 0 | return Err(make_input_err!( |
127 | 0 | "SizePartitioningStore only supports Digest keys, got {other:?}" |
128 | 0 | )) |
129 | | } |
130 | | }; |
131 | 2 | if digest.size_bytes() < self.partition_size { Branch (131:12): [True: 1, False: 1]
Branch (131:12): [Folded - Ignored]
|
132 | 1 | return self |
133 | 1 | .lower_store |
134 | 1 | .get_part(digest, writer, offset, length) |
135 | 1 | .await; |
136 | 1 | } |
137 | 1 | self.upper_store |
138 | 1 | .get_part(digest, writer, offset, length) |
139 | 1 | .await |
140 | 4 | } |
141 | | |
142 | 0 | fn inner_store(&self, key: Option<StoreKey>) -> &'_ dyn StoreDriver { |
143 | 0 | let Some(key) = key else { Branch (143:13): [True: 0, False: 0]
Branch (143:13): [Folded - Ignored]
|
144 | 0 | return self; |
145 | | }; |
146 | 0 | let StoreKey::Digest(digest) = key else { Branch (146:13): [True: 0, False: 0]
Branch (146:13): [Folded - Ignored]
|
147 | 0 | return self; |
148 | | }; |
149 | 0 | if digest.size_bytes() < self.partition_size { Branch (149:12): [True: 0, False: 0]
Branch (149:12): [Folded - Ignored]
|
150 | 0 | return self.lower_store.inner_store(Some(digest)); |
151 | 0 | } |
152 | 0 | self.upper_store.inner_store(Some(digest)) |
153 | 0 | } |
154 | | |
155 | 0 | fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) { |
156 | 0 | self |
157 | 0 | } |
158 | | |
159 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> { |
160 | 0 | self |
161 | 0 | } |
162 | | } |
163 | | |
164 | | default_health_status_indicator!(SizePartitioningStore); |