/build/source/nativelink-scheduler/src/cache_lookup_scheduler.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::collections::HashMap; |
16 | | use std::sync::Arc; |
17 | | |
18 | | use async_trait::async_trait; |
19 | | use nativelink_error::{make_err, Code, Error, ResultExt}; |
20 | | use nativelink_metric::{MetricsComponent, RootMetricsComponent}; |
21 | | use nativelink_proto::build::bazel::remote::execution::v2::{ |
22 | | ActionResult as ProtoActionResult, GetActionResultRequest, |
23 | | }; |
24 | | use nativelink_store::ac_utils::get_and_decode_digest; |
25 | | use nativelink_store::grpc_store::GrpcStore; |
26 | | use nativelink_util::action_messages::{ |
27 | | ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, OperationId, |
28 | | }; |
29 | | use nativelink_util::background_spawn; |
30 | | use nativelink_util::common::DigestInfo; |
31 | | use nativelink_util::digest_hasher::DigestHasherFunc; |
32 | | use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider; |
33 | | use nativelink_util::operation_state_manager::{ |
34 | | ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter, |
35 | | }; |
36 | | use nativelink_util::store_trait::Store; |
37 | | use parking_lot::{Mutex, MutexGuard}; |
38 | | use scopeguard::guard; |
39 | | use tokio::sync::oneshot; |
40 | | use tonic::{Request, Response}; |
41 | | use tracing::{event, Level}; |
42 | | |
43 | | /// Actions that are having their cache checked or failed cache lookup and are |
44 | | /// being forwarded upstream. Missing the skip_cache_check actions which are |
45 | | /// forwarded directly. |
46 | | type CheckActions = HashMap< |
47 | | ActionUniqueKey, |
48 | | Vec<( |
49 | | OperationId, |
50 | | oneshot::Sender<Result<Box<dyn ActionStateResult>, Error>>, |
51 | | )>, |
52 | | >; |
53 | | |
54 | 0 | #[derive(MetricsComponent)] |
55 | | pub struct CacheLookupScheduler { |
56 | | /// A reference to the AC to find existing actions in. |
57 | | /// To prevent unintended issues, this store should probably be a CompletenessCheckingStore. |
58 | | #[metric(group = "ac_store")] |
59 | | ac_store: Store, |
60 | | /// The "real" scheduler to use to perform actions if they were not found |
61 | | /// in the action cache. |
62 | | #[metric(group = "action_scheduler")] |
63 | | action_scheduler: Arc<dyn ClientStateManager>, |
64 | | /// Actions that are currently performing a CacheCheck. |
65 | | inflight_cache_checks: Arc<Mutex<CheckActions>>, |
66 | | } |
67 | | |
68 | 0 | async fn get_action_from_store( |
69 | 0 | ac_store: &Store, |
70 | 0 | action_digest: DigestInfo, |
71 | 0 | instance_name: String, |
72 | 0 | digest_function: DigestHasherFunc, |
73 | 0 | ) -> Result<ProtoActionResult, Error> { |
74 | | // If we are a GrpcStore we shortcut here, as this is a special store. |
75 | 0 | if let Some(grpc_store) = ac_store.downcast_ref::<GrpcStore>(Some(action_digest.into())) { Branch (75:12): [True: 0, False: 0]
Branch (75:12): [Folded - Ignored]
|
76 | 0 | let action_result_request = GetActionResultRequest { |
77 | 0 | instance_name, |
78 | 0 | action_digest: Some(action_digest.into()), |
79 | 0 | inline_stdout: false, |
80 | 0 | inline_stderr: false, |
81 | 0 | inline_output_files: Vec::new(), |
82 | 0 | digest_function: digest_function.proto_digest_func().into(), |
83 | 0 | }; |
84 | 0 | grpc_store |
85 | 0 | .get_action_result(Request::new(action_result_request)) |
86 | 0 | .await |
87 | 0 | .map(Response::into_inner) |
88 | | } else { |
89 | 0 | get_and_decode_digest::<ProtoActionResult>(ac_store, action_digest.into()).await |
90 | | } |
91 | 0 | } |
92 | | |
93 | | /// Future for when ActionStateResults are known. |
94 | | type ActionStateResultOneshot = oneshot::Receiver<Result<Box<dyn ActionStateResult>, Error>>; |
95 | | |
96 | 0 | fn subscribe_to_existing_action( |
97 | 0 | inflight_cache_checks: &mut MutexGuard<CheckActions>, |
98 | 0 | unique_qualifier: &ActionUniqueKey, |
99 | 0 | client_operation_id: &OperationId, |
100 | 0 | ) -> Option<ActionStateResultOneshot> { |
101 | 0 | inflight_cache_checks |
102 | 0 | .get_mut(unique_qualifier) |
103 | 0 | .map(|oneshots| { |
104 | 0 | let (tx, rx) = oneshot::channel(); |
105 | 0 | oneshots.push((client_operation_id.clone(), tx)); |
106 | 0 | rx |
107 | 0 | }) |
108 | 0 | } |
109 | | |
110 | | struct CacheLookupActionStateResult { |
111 | | action_state: Arc<ActionState>, |
112 | | change_called: bool, |
113 | | } |
114 | | |
115 | | #[async_trait] |
116 | | impl ActionStateResult for CacheLookupActionStateResult { |
117 | 0 | async fn as_state(&self) -> Result<Arc<ActionState>, Error> { |
118 | 0 | Ok(self.action_state.clone()) |
119 | 0 | } |
120 | | |
121 | 0 | async fn changed(&mut self) -> Result<Arc<ActionState>, Error> { |
122 | 0 | if self.change_called { Branch (122:12): [True: 0, False: 0]
Branch (122:12): [Folded - Ignored]
|
123 | 0 | return Err(make_err!( |
124 | 0 | Code::Internal, |
125 | 0 | "CacheLookupActionStateResult::changed called twice" |
126 | 0 | )); |
127 | 0 | } |
128 | 0 | self.change_called = true; |
129 | 0 | Ok(self.action_state.clone()) |
130 | 0 | } |
131 | | |
132 | 0 | async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> { |
133 | | // TODO(allada) We should probably remove as_action_info() |
134 | | // or implement it properly. |
135 | 0 | return Err(make_err!( |
136 | 0 | Code::Unimplemented, |
137 | 0 | "as_action_info not implemented for CacheLookupActionStateResult::as_action_info" |
138 | 0 | )); |
139 | 0 | } |
140 | | } |
141 | | |
142 | | impl CacheLookupScheduler { |
143 | 2 | pub fn new( |
144 | 2 | ac_store: Store, |
145 | 2 | action_scheduler: Arc<dyn ClientStateManager>, |
146 | 2 | ) -> Result<Self, Error> { |
147 | 2 | Ok(Self { |
148 | 2 | ac_store, |
149 | 2 | action_scheduler, |
150 | 2 | inflight_cache_checks: Default::default(), |
151 | 2 | }) |
152 | 2 | } |
153 | | |
154 | 1 | async fn inner_add_action( |
155 | 1 | &self, |
156 | 1 | client_operation_id: OperationId, |
157 | 1 | action_info: Arc<ActionInfo>, |
158 | 1 | ) -> Result<Box<dyn ActionStateResult>, Error> { |
159 | 1 | let unique_key0 = match &action_info.unique_qualifier { |
160 | 0 | ActionUniqueQualifier::Cachable(unique_key) => unique_key.clone(), |
161 | | ActionUniqueQualifier::Uncachable(_) => { |
162 | | // Cache lookup skipped, forward to the upstream. |
163 | 1 | return self |
164 | 1 | .action_scheduler |
165 | 1 | .add_action(client_operation_id, action_info) |
166 | 1 | .await; |
167 | | } |
168 | | }; |
169 | | |
170 | 0 | let cache_check_result = { |
171 | 0 | // Check this isn't a duplicate request first. |
172 | 0 | let mut inflight_cache_checks = self.inflight_cache_checks.lock(); |
173 | 0 | subscribe_to_existing_action( |
174 | 0 | &mut inflight_cache_checks, |
175 | 0 | &unique_key, |
176 | 0 | &client_operation_id, |
177 | 0 | ) |
178 | 0 | .ok_or_else(move || { |
179 | 0 | let (action_listener_tx, action_listener_rx) = oneshot::channel(); |
180 | 0 | inflight_cache_checks.insert( |
181 | 0 | unique_key.clone(), |
182 | 0 | vec![(client_operation_id, action_listener_tx)], |
183 | 0 | ); |
184 | 0 | // In the event we loose the reference to our `scope_guard`, it will remove |
185 | 0 | // the action from the inflight_cache_checks map. |
186 | 0 | let inflight_cache_checks = self.inflight_cache_checks.clone(); |
187 | 0 | ( |
188 | 0 | action_listener_rx, |
189 | 0 | guard((), move |_| { |
190 | 0 | inflight_cache_checks.lock().remove(&unique_key); |
191 | 0 | }), |
192 | 0 | ) |
193 | 0 | }) |
194 | | }; |
195 | 0 | let (action_listener_rx, scope_guard) = match cache_check_result { |
196 | 0 | Ok(action_listener_fut) => { |
197 | 0 | let action_listener = action_listener_fut.await.map_err(|_| { |
198 | 0 | make_err!( |
199 | 0 | Code::Internal, |
200 | 0 | "ActionStateResult tx hung up in CacheLookupScheduler::add_action" |
201 | 0 | ) |
202 | 0 | })?; |
203 | 0 | return action_listener; |
204 | | } |
205 | 0 | Err(client_tx_and_scope_guard) => client_tx_and_scope_guard, |
206 | 0 | }; |
207 | 0 |
|
208 | 0 | let ac_store = self.ac_store.clone(); |
209 | 0 | let action_scheduler = self.action_scheduler.clone(); |
210 | 0 | let inflight_cache_checks = self.inflight_cache_checks.clone(); |
211 | 0 | // We need this spawn because we are returning a stream and this spawn will populate the stream's data. |
212 | 0 | background_spawn!("cache_lookup_scheduler_add_action", async move { |
213 | 0 | // If our spawn ever dies, we will remove the action from the inflight_cache_checks map. |
214 | 0 | let _scope_guard = scope_guard; |
215 | | |
216 | 0 | let unique_key = match &action_info.unique_qualifier { |
217 | 0 | ActionUniqueQualifier::Cachable(unique_key) => unique_key, |
218 | 0 | ActionUniqueQualifier::Uncachable(unique_key) => { |
219 | 0 | event!( |
220 | 0 | Level::ERROR, |
221 | | ?action_info, |
222 | 0 | "ActionInfo::unique_qualifier should be ActionUniqueQualifier::Cachable()" |
223 | | ); |
224 | 0 | unique_key |
225 | | } |
226 | | }; |
227 | | |
228 | | // Perform cache check. |
229 | 0 | let instance_name = action_info.unique_qualifier.instance_name().clone(); |
230 | 0 | let maybe_action_result = get_action_from_store( |
231 | 0 | &ac_store, |
232 | 0 | action_info.unique_qualifier.digest(), |
233 | 0 | instance_name, |
234 | 0 | action_info.unique_qualifier.digest_function(), |
235 | 0 | ) |
236 | 0 | .await; |
237 | 0 | match maybe_action_result { |
238 | 0 | Ok(action_result) => { |
239 | 0 | let maybe_pending_txs = { |
240 | 0 | let mut inflight_cache_checks = inflight_cache_checks.lock(); |
241 | 0 | // We are ready to resolve the in-flight actions. We remove the |
242 | 0 | // in-flight actions from the map. |
243 | 0 | inflight_cache_checks.remove(unique_key) |
244 | | }; |
245 | 0 | let Some(pending_txs) = maybe_pending_txs else { Branch (245:25): [True: 0, False: 0]
Branch (245:25): [Folded - Ignored]
|
246 | 0 | return; // Nobody is waiting for this action anymore. |
247 | | }; |
248 | 0 | let mut action_state = ActionState { |
249 | 0 | client_operation_id: OperationId::default(), |
250 | 0 | stage: ActionStage::CompletedFromCache(action_result), |
251 | 0 | action_digest: action_info.unique_qualifier.digest(), |
252 | 0 | }; |
253 | | |
254 | 0 | for (client_operation_id, pending_tx) in pending_txs { |
255 | 0 | action_state.client_operation_id = client_operation_id; |
256 | 0 | // Ignore errors here, as the other end may have hung up. |
257 | 0 | let _ = pending_tx.send(Ok(Box::new(CacheLookupActionStateResult { |
258 | 0 | action_state: Arc::new(action_state.clone()), |
259 | 0 | change_called: false, |
260 | 0 | }))); |
261 | 0 | } |
262 | 0 | return; |
263 | | } |
264 | 0 | Err(err) => { |
265 | 0 | // NotFound errors just mean we need to execute our action. |
266 | 0 | if err.code != Code::NotFound { Branch (266:24): [True: 0, False: 0]
Branch (266:24): [Folded - Ignored]
|
267 | 0 | let err = err.append("In CacheLookupScheduler::add_action"); |
268 | 0 | let maybe_pending_txs = { |
269 | 0 | let mut inflight_cache_checks = inflight_cache_checks.lock(); |
270 | 0 | // We are ready to resolve the in-flight actions. We remove the |
271 | 0 | // in-flight actions from the map. |
272 | 0 | inflight_cache_checks.remove(unique_key) |
273 | | }; |
274 | 0 | let Some(pending_txs) = maybe_pending_txs else { Branch (274:29): [True: 0, False: 0]
Branch (274:29): [Folded - Ignored]
|
275 | 0 | return; // Nobody is waiting for this action anymore. |
276 | | }; |
277 | 0 | for (_client_operation_id, pending_tx) in pending_txs { |
278 | 0 | // Ignore errors here, as the other end may have hung up. |
279 | 0 | let _ = pending_tx.send(Err(err.clone())); |
280 | 0 | } |
281 | 0 | return; |
282 | 0 | } |
283 | 0 | } |
284 | 0 | } |
285 | 0 |
|
286 | 0 | let maybe_pending_txs = { |
287 | 0 | let mut inflight_cache_checks = inflight_cache_checks.lock(); |
288 | 0 | inflight_cache_checks.remove(unique_key) |
289 | | }; |
290 | 0 | let Some(pending_txs) = maybe_pending_txs else { Branch (290:17): [True: 0, False: 0]
Branch (290:17): [Folded - Ignored]
|
291 | 0 | return; // Noone is waiting for this action anymore. |
292 | | }; |
293 | | |
294 | 0 | for (client_operation_id, pending_tx) in pending_txs { |
295 | | // Ignore errors here, as the other end may have hung up. |
296 | 0 | let _ = pending_tx.send( |
297 | 0 | action_scheduler |
298 | 0 | .add_action(client_operation_id, action_info.clone()) |
299 | 0 | .await, |
300 | | ); |
301 | | } |
302 | 0 | }); |
303 | 0 | action_listener_rx |
304 | 0 | .await |
305 | 0 | .map_err(|_| { |
306 | 0 | make_err!( |
307 | 0 | Code::Internal, |
308 | 0 | "ActionStateResult tx hung up in CacheLookupScheduler::add_action" |
309 | 0 | ) |
310 | 0 | })? |
311 | 0 | .err_tip(|| "In CacheLookupScheduler::add_action") |
312 | 1 | } |
313 | | |
314 | 1 | async fn inner_filter_operations( |
315 | 1 | &self, |
316 | 1 | filter: OperationFilter, |
317 | 1 | ) -> Result<ActionStateResultStream, Error> { |
318 | 1 | self.action_scheduler |
319 | 1 | .filter_operations(filter) |
320 | 1 | .await |
321 | 1 | .err_tip(|| "In CacheLookupScheduler::filter_operations"0 ) |
322 | 1 | } |
323 | | } |
324 | | |
325 | | #[async_trait] |
326 | | impl ClientStateManager for CacheLookupScheduler { |
327 | | async fn add_action( |
328 | | &self, |
329 | | client_operation_id: OperationId, |
330 | | action_info: Arc<ActionInfo>, |
331 | 1 | ) -> Result<Box<dyn ActionStateResult>, Error> { |
332 | 1 | self.inner_add_action(client_operation_id, action_info) |
333 | 1 | .await |
334 | 2 | } |
335 | | |
336 | | async fn filter_operations( |
337 | | &self, |
338 | | filter: OperationFilter, |
339 | 1 | ) -> Result<ActionStateResultStream, Error> { |
340 | 1 | self.inner_filter_operations(filter).await |
341 | 2 | } |
342 | | |
343 | 0 | fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> { |
344 | 0 | self.action_scheduler.as_known_platform_property_provider() |
345 | 0 | } |
346 | | } |
347 | | |
348 | | impl RootMetricsComponent for CacheLookupScheduler {} |