/build/source/nativelink-store/src/size_partitioning_store.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::pin::Pin; |
16 | | use std::sync::Arc; |
17 | | |
18 | | use async_trait::async_trait; |
19 | | use futures::try_join; |
20 | | use nativelink_config::stores::SizePartitioningSpec; |
21 | | use nativelink_error::{Error, ResultExt, make_input_err}; |
22 | | use nativelink_metric::MetricsComponent; |
23 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
24 | | use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator}; |
25 | | use nativelink_util::store_trait::{ |
26 | | RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo, |
27 | | }; |
28 | | use tokio::join; |
29 | | |
30 | | #[derive(Debug, MetricsComponent)] |
31 | | pub struct SizePartitioningStore { |
32 | | #[metric(help = "Size to partition our data")] |
33 | | partition_size: u64, |
34 | | #[metric(group = "lower_store")] |
35 | | lower_store: Store, |
36 | | #[metric(group = "upper_store")] |
37 | | upper_store: Store, |
38 | | } |
39 | | |
40 | | impl SizePartitioningStore { |
41 | 3 | pub fn new(spec: &SizePartitioningSpec, lower_store: Store, upper_store: Store) -> Arc<Self> { |
42 | 3 | Arc::new(Self { |
43 | 3 | partition_size: spec.size, |
44 | 3 | lower_store, |
45 | 3 | upper_store, |
46 | 3 | }) |
47 | 3 | } |
48 | | } |
49 | | |
50 | | #[async_trait] |
51 | | impl StoreDriver for SizePartitioningStore { |
52 | 0 | async fn post_init(self: Arc<Self>) -> Result<(), Error> { |
53 | | try_join!( |
54 | | self.upper_store.clone().into_inner().post_init(), |
55 | | self.lower_store.clone().into_inner().post_init(), |
56 | | )?; |
57 | | Ok(()) |
58 | 0 | } |
59 | | |
60 | | async fn has_with_results( |
61 | | self: Pin<&Self>, |
62 | | keys: &[StoreKey<'_>], |
63 | | results: &mut [Option<u64>], |
64 | 2 | ) -> Result<(), Error> { |
65 | | let mut non_digest_sample = None; |
66 | | let (lower_digests, upper_digests): (Vec<_>, Vec<_>) = |
67 | 2 | keys.iter().map(StoreKey::borrow).partition(|k| { |
68 | 2 | let StoreKey::Digest(digest) = k else { |
69 | 0 | non_digest_sample = Some(k.borrow().into_owned()); |
70 | 0 | return false; |
71 | | }; |
72 | 2 | digest.size_bytes() < self.partition_size |
73 | 2 | }); |
74 | | if let Some(non_digest) = non_digest_sample { |
75 | | return Err(make_input_err!( |
76 | | "SizePartitioningStore only supports Digest keys, got {non_digest:?}" |
77 | | )); |
78 | | } |
79 | | let (lower_results, upper_results) = join!( |
80 | | self.lower_store.has_many(&lower_digests), |
81 | | self.upper_store.has_many(&upper_digests), |
82 | | ); |
83 | | let mut lower_results = match lower_results { |
84 | | Ok(lower_results) => lower_results.into_iter(), |
85 | | Err(err) => match upper_results { |
86 | | Ok(_) => return Err(err), |
87 | | Err(upper_err) => return Err(err.merge(upper_err)), |
88 | | }, |
89 | | }; |
90 | | let mut upper_digests = upper_digests.into_iter().peekable(); |
91 | | let mut upper_results = upper_results?.into_iter(); |
92 | | for (digest, result) in keys.iter().zip(results.iter_mut()) { |
93 | | if Some(digest) == upper_digests.peek() { |
94 | | upper_digests.next(); |
95 | | *result = upper_results |
96 | | .next() |
97 | | .err_tip(|| "upper_results out of sync with upper_digests")?; |
98 | | } else { |
99 | | *result = lower_results |
100 | | .next() |
101 | | .err_tip(|| "lower_results out of sync with lower_digests")?; |
102 | | } |
103 | | } |
104 | | Ok(()) |
105 | 2 | } |
106 | | |
107 | | async fn update( |
108 | | self: Pin<&Self>, |
109 | | key: StoreKey<'_>, |
110 | | reader: DropCloserReadHalf, |
111 | | size_info: UploadSizeInfo, |
112 | 2 | ) -> Result<u64, Error> { |
113 | | let digest = match key { |
114 | | StoreKey::Digest(digest) => digest, |
115 | | other @ StoreKey::Str(_) => { |
116 | | return Err(make_input_err!( |
117 | | "SizePartitioningStore only supports Digest keys, got {other:?}" |
118 | | )); |
119 | | } |
120 | | }; |
121 | | if digest.size_bytes() < self.partition_size { |
122 | | return self.lower_store.update(digest, reader, size_info).await; |
123 | | } |
124 | | self.upper_store.update(digest, reader, size_info).await |
125 | 2 | } |
126 | | |
127 | | async fn get_part( |
128 | | self: Pin<&Self>, |
129 | | key: StoreKey<'_>, |
130 | | writer: &mut DropCloserWriteHalf, |
131 | | offset: u64, |
132 | | length: Option<u64>, |
133 | 2 | ) -> Result<(), Error> { |
134 | | let digest = match key { |
135 | | StoreKey::Digest(digest) => digest, |
136 | | other @ StoreKey::Str(_) => { |
137 | | return Err(make_input_err!( |
138 | | "SizePartitioningStore only supports Digest keys, got {other:?}" |
139 | | )); |
140 | | } |
141 | | }; |
142 | | if digest.size_bytes() < self.partition_size { |
143 | | return self |
144 | | .lower_store |
145 | | .get_part(digest, writer, offset, length) |
146 | | .await; |
147 | | } |
148 | | self.upper_store |
149 | | .get_part(digest, writer, offset, length) |
150 | | .await |
151 | 2 | } |
152 | | |
153 | 0 | fn inner_store(&self, key: Option<StoreKey>) -> &'_ dyn StoreDriver { |
154 | 0 | let Some(key) = key else { |
155 | 0 | return self; |
156 | | }; |
157 | 0 | let StoreKey::Digest(digest) = key else { |
158 | 0 | return self; |
159 | | }; |
160 | 0 | if digest.size_bytes() < self.partition_size { |
161 | 0 | return self.lower_store.inner_store(Some(digest)); |
162 | 0 | } |
163 | 0 | self.upper_store.inner_store(Some(digest)) |
164 | 0 | } |
165 | | |
166 | 0 | fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) { |
167 | 0 | self |
168 | 0 | } |
169 | | |
170 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> { |
171 | 0 | self |
172 | 0 | } |
173 | | |
174 | 0 | fn register_remove_callback( |
175 | 0 | self: Arc<Self>, |
176 | 0 | callback: Arc<dyn RemoveItemCallback>, |
177 | 0 | ) -> Result<(), Error> { |
178 | 0 | self.lower_store |
179 | 0 | .register_remove_callback(callback.clone())?; |
180 | 0 | self.upper_store.register_remove_callback(callback)?; |
181 | 0 | Ok(()) |
182 | 0 | } |
183 | | } |
184 | | |
185 | | default_health_status_indicator!(SizePartitioningStore); |