/build/source/nativelink-store/src/size_partitioning_store.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::pin::Pin; |
16 | | use std::sync::Arc; |
17 | | |
18 | | use async_trait::async_trait; |
19 | | use nativelink_config::stores::SizePartitioningSpec; |
20 | | use nativelink_error::{Error, ResultExt, make_input_err}; |
21 | | use nativelink_metric::MetricsComponent; |
22 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
23 | | use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator}; |
24 | | use nativelink_util::store_trait::{ |
25 | | RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo, |
26 | | }; |
27 | | use tokio::join; |
28 | | |
29 | | #[derive(Debug, MetricsComponent)] |
30 | | pub struct SizePartitioningStore { |
31 | | #[metric(help = "Size to partition our data")] |
32 | | partition_size: u64, |
33 | | #[metric(group = "lower_store")] |
34 | | lower_store: Store, |
35 | | #[metric(group = "upper_store")] |
36 | | upper_store: Store, |
37 | | } |
38 | | |
39 | | impl SizePartitioningStore { |
40 | 3 | pub fn new(spec: &SizePartitioningSpec, lower_store: Store, upper_store: Store) -> Arc<Self> { |
41 | 3 | Arc::new(Self { |
42 | 3 | partition_size: spec.size, |
43 | 3 | lower_store, |
44 | 3 | upper_store, |
45 | 3 | }) |
46 | 3 | } |
47 | | } |
48 | | |
49 | | #[async_trait] |
50 | | impl StoreDriver for SizePartitioningStore { |
51 | | async fn has_with_results( |
52 | | self: Pin<&Self>, |
53 | | keys: &[StoreKey<'_>], |
54 | | results: &mut [Option<u64>], |
55 | 2 | ) -> Result<(), Error> { |
56 | | let mut non_digest_sample = None; |
57 | | let (lower_digests, upper_digests): (Vec<_>, Vec<_>) = |
58 | 2 | keys.iter().map(StoreKey::borrow).partition(|k| { |
59 | 2 | let StoreKey::Digest(digest) = k else { Branch (59:21): [True: 2, False: 0]
Branch (59:21): [Folded - Ignored]
|
60 | 0 | non_digest_sample = Some(k.borrow().into_owned()); |
61 | 0 | return false; |
62 | | }; |
63 | 2 | digest.size_bytes() < self.partition_size |
64 | 2 | }); |
65 | | if let Some(non_digest) = non_digest_sample { |
66 | | return Err(make_input_err!( |
67 | | "SizePartitioningStore only supports Digest keys, got {non_digest:?}" |
68 | | )); |
69 | | } |
70 | | let (lower_results, upper_results) = join!( |
71 | | self.lower_store.has_many(&lower_digests), |
72 | | self.upper_store.has_many(&upper_digests), |
73 | | ); |
74 | | let mut lower_results = match lower_results { |
75 | | Ok(lower_results) => lower_results.into_iter(), |
76 | | Err(err) => match upper_results { |
77 | | Ok(_) => return Err(err), |
78 | | Err(upper_err) => return Err(err.merge(upper_err)), |
79 | | }, |
80 | | }; |
81 | | let mut upper_digests = upper_digests.into_iter().peekable(); |
82 | | let mut upper_results = upper_results?.into_iter(); |
83 | | for (digest, result) in keys.iter().zip(results.iter_mut()) { |
84 | | if Some(digest) == upper_digests.peek() { |
85 | | upper_digests.next(); |
86 | | *result = upper_results |
87 | | .next() |
88 | | .err_tip(|| "upper_results out of sync with upper_digests")?; |
89 | | } else { |
90 | | *result = lower_results |
91 | | .next() |
92 | | .err_tip(|| "lower_results out of sync with lower_digests")?; |
93 | | } |
94 | | } |
95 | | Ok(()) |
96 | 2 | } |
97 | | |
98 | | async fn update( |
99 | | self: Pin<&Self>, |
100 | | key: StoreKey<'_>, |
101 | | reader: DropCloserReadHalf, |
102 | | size_info: UploadSizeInfo, |
103 | 2 | ) -> Result<(), Error> { |
104 | | let digest = match key { |
105 | | StoreKey::Digest(digest) => digest, |
106 | | other @ StoreKey::Str(_) => { |
107 | | return Err(make_input_err!( |
108 | | "SizePartitioningStore only supports Digest keys, got {other:?}" |
109 | | )); |
110 | | } |
111 | | }; |
112 | | if digest.size_bytes() < self.partition_size { |
113 | | return self.lower_store.update(digest, reader, size_info).await; |
114 | | } |
115 | | self.upper_store.update(digest, reader, size_info).await |
116 | 2 | } |
117 | | |
118 | | async fn get_part( |
119 | | self: Pin<&Self>, |
120 | | key: StoreKey<'_>, |
121 | | writer: &mut DropCloserWriteHalf, |
122 | | offset: u64, |
123 | | length: Option<u64>, |
124 | 2 | ) -> Result<(), Error> { |
125 | | let digest = match key { |
126 | | StoreKey::Digest(digest) => digest, |
127 | | other @ StoreKey::Str(_) => { |
128 | | return Err(make_input_err!( |
129 | | "SizePartitioningStore only supports Digest keys, got {other:?}" |
130 | | )); |
131 | | } |
132 | | }; |
133 | | if digest.size_bytes() < self.partition_size { |
134 | | return self |
135 | | .lower_store |
136 | | .get_part(digest, writer, offset, length) |
137 | | .await; |
138 | | } |
139 | | self.upper_store |
140 | | .get_part(digest, writer, offset, length) |
141 | | .await |
142 | 2 | } |
143 | | |
144 | 0 | fn inner_store(&self, key: Option<StoreKey>) -> &'_ dyn StoreDriver { |
145 | 0 | let Some(key) = key else { Branch (145:13): [True: 0, False: 0]
Branch (145:13): [Folded - Ignored]
|
146 | 0 | return self; |
147 | | }; |
148 | 0 | let StoreKey::Digest(digest) = key else { Branch (148:13): [True: 0, False: 0]
Branch (148:13): [Folded - Ignored]
|
149 | 0 | return self; |
150 | | }; |
151 | 0 | if digest.size_bytes() < self.partition_size { Branch (151:12): [True: 0, False: 0]
Branch (151:12): [Folded - Ignored]
|
152 | 0 | return self.lower_store.inner_store(Some(digest)); |
153 | 0 | } |
154 | 0 | self.upper_store.inner_store(Some(digest)) |
155 | 0 | } |
156 | | |
157 | 0 | fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) { |
158 | 0 | self |
159 | 0 | } |
160 | | |
161 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> { |
162 | 0 | self |
163 | 0 | } |
164 | | |
165 | 0 | fn register_remove_callback( |
166 | 0 | self: Arc<Self>, |
167 | 0 | callback: Arc<dyn RemoveItemCallback>, |
168 | 0 | ) -> Result<(), Error> { |
169 | 0 | self.lower_store |
170 | 0 | .register_remove_callback(callback.clone())?; |
171 | 0 | self.upper_store.register_remove_callback(callback)?; |
172 | 0 | Ok(()) |
173 | 0 | } |
174 | | } |
175 | | |
176 | | default_health_status_indicator!(SizePartitioningStore); |