/build/source/nativelink-scheduler/src/cache_lookup_scheduler.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::collections::HashMap; |
16 | | use std::sync::Arc; |
17 | | |
18 | | use async_trait::async_trait; |
19 | | use nativelink_error::{make_err, Code, Error, ResultExt}; |
20 | | use nativelink_metric::{MetricsComponent, RootMetricsComponent}; |
21 | | use nativelink_proto::build::bazel::remote::execution::v2::{ |
22 | | ActionResult as ProtoActionResult, GetActionResultRequest, |
23 | | }; |
24 | | use nativelink_store::ac_utils::get_and_decode_digest; |
25 | | use nativelink_store::grpc_store::GrpcStore; |
26 | | use nativelink_util::action_messages::{ |
27 | | ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, OperationId, |
28 | | }; |
29 | | use nativelink_util::background_spawn; |
30 | | use nativelink_util::common::DigestInfo; |
31 | | use nativelink_util::digest_hasher::DigestHasherFunc; |
32 | | use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider; |
33 | | use nativelink_util::operation_state_manager::{ |
34 | | ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter, |
35 | | }; |
36 | | use nativelink_util::origin_context::ActiveOriginContext; |
37 | | use nativelink_util::origin_event::{OriginMetadata, ORIGIN_EVENT_COLLECTOR}; |
38 | | use nativelink_util::store_trait::Store; |
39 | | use parking_lot::{Mutex, MutexGuard}; |
40 | | use scopeguard::guard; |
41 | | use tokio::sync::oneshot; |
42 | | use tonic::{Request, Response}; |
43 | | use tracing::{event, Level}; |
44 | | |
45 | | /// Actions that are having their cache checked or failed cache lookup and are |
46 | | /// being forwarded upstream. Missing the `skip_cache_check` actions which are |
47 | | /// forwarded directly. |
48 | | type CheckActions = HashMap< |
49 | | ActionUniqueKey, |
50 | | Vec<( |
51 | | OperationId, |
52 | | oneshot::Sender<Result<Box<dyn ActionStateResult>, Error>>, |
53 | | )>, |
54 | | >; |
55 | | |
56 | | #[derive(MetricsComponent)] |
57 | | pub struct CacheLookupScheduler { |
58 | | /// A reference to the AC to find existing actions in. |
59 | | /// To prevent unintended issues, this store should probably be a `CompletenessCheckingStore`. |
60 | | #[metric(group = "ac_store")] |
61 | | ac_store: Store, |
62 | | /// The "real" scheduler to use to perform actions if they were not found |
63 | | /// in the action cache. |
64 | | #[metric(group = "action_scheduler")] |
65 | | action_scheduler: Arc<dyn ClientStateManager>, |
66 | | /// Actions that are currently performing a `CacheCheck`. |
67 | | inflight_cache_checks: Arc<Mutex<CheckActions>>, |
68 | | } |
69 | | |
70 | 0 | async fn get_action_from_store( |
71 | 0 | ac_store: &Store, |
72 | 0 | action_digest: DigestInfo, |
73 | 0 | instance_name: String, |
74 | 0 | digest_function: DigestHasherFunc, |
75 | 0 | ) -> Result<ProtoActionResult, Error> { |
76 | | // If we are a GrpcStore we shortcut here, as this is a special store. |
77 | 0 | if let Some(grpc_store) = ac_store.downcast_ref::<GrpcStore>(Some(action_digest.into())) { Branch (77:12): [True: 0, False: 0]
Branch (77:12): [Folded - Ignored]
|
78 | 0 | let action_result_request = GetActionResultRequest { |
79 | 0 | instance_name, |
80 | 0 | action_digest: Some(action_digest.into()), |
81 | 0 | inline_stdout: false, |
82 | 0 | inline_stderr: false, |
83 | 0 | inline_output_files: Vec::new(), |
84 | 0 | digest_function: digest_function.proto_digest_func().into(), |
85 | 0 | }; |
86 | 0 | grpc_store |
87 | 0 | .get_action_result(Request::new(action_result_request)) |
88 | 0 | .await |
89 | 0 | .map(Response::into_inner) |
90 | | } else { |
91 | 0 | get_and_decode_digest::<ProtoActionResult>(ac_store, action_digest.into()).await |
92 | | } |
93 | 0 | } |
94 | | |
95 | | /// Future for when `ActionStateResults` are known. |
96 | | type ActionStateResultOneshot = oneshot::Receiver<Result<Box<dyn ActionStateResult>, Error>>; |
97 | | |
98 | 0 | fn subscribe_to_existing_action( |
99 | 0 | inflight_cache_checks: &mut MutexGuard<CheckActions>, |
100 | 0 | unique_qualifier: &ActionUniqueKey, |
101 | 0 | client_operation_id: &OperationId, |
102 | 0 | ) -> Option<ActionStateResultOneshot> { |
103 | 0 | inflight_cache_checks |
104 | 0 | .get_mut(unique_qualifier) |
105 | 0 | .map(|oneshots| { |
106 | 0 | let (tx, rx) = oneshot::channel(); |
107 | 0 | oneshots.push((client_operation_id.clone(), tx)); |
108 | 0 | rx |
109 | 0 | }) |
110 | 0 | } |
111 | | |
112 | | struct CacheLookupActionStateResult { |
113 | | action_state: Arc<ActionState>, |
114 | | maybe_origin_metadata: Option<OriginMetadata>, |
115 | | change_called: bool, |
116 | | } |
117 | | |
118 | | #[async_trait] |
119 | | impl ActionStateResult for CacheLookupActionStateResult { |
120 | 0 | async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> { |
121 | 0 | Ok(( |
122 | 0 | self.action_state.clone(), |
123 | 0 | self.maybe_origin_metadata.clone(), |
124 | 0 | )) |
125 | 0 | } |
126 | | |
127 | 0 | async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> { |
128 | 0 | if self.change_called { Branch (128:12): [True: 0, False: 0]
Branch (128:12): [Folded - Ignored]
|
129 | 0 | return Err(make_err!( |
130 | 0 | Code::Internal, |
131 | 0 | "CacheLookupActionStateResult::changed called twice" |
132 | 0 | )); |
133 | 0 | } |
134 | 0 | self.change_called = true; |
135 | 0 | Ok(( |
136 | 0 | self.action_state.clone(), |
137 | 0 | self.maybe_origin_metadata.clone(), |
138 | 0 | )) |
139 | 0 | } |
140 | | |
141 | 0 | async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> { |
142 | | // TODO(allada) We should probably remove as_action_info() |
143 | | // or implement it properly. |
144 | 0 | return Err(make_err!( |
145 | 0 | Code::Unimplemented, |
146 | 0 | "as_action_info not implemented for CacheLookupActionStateResult::as_action_info" |
147 | 0 | )); |
148 | 0 | } |
149 | | } |
150 | | |
151 | | impl CacheLookupScheduler { |
152 | 2 | pub fn new( |
153 | 2 | ac_store: Store, |
154 | 2 | action_scheduler: Arc<dyn ClientStateManager>, |
155 | 2 | ) -> Result<Self, Error> { |
156 | 2 | Ok(Self { |
157 | 2 | ac_store, |
158 | 2 | action_scheduler, |
159 | 2 | inflight_cache_checks: Arc::default(), |
160 | 2 | }) |
161 | 2 | } |
162 | | |
163 | 1 | async fn inner_add_action( |
164 | 1 | &self, |
165 | 1 | client_operation_id: OperationId, |
166 | 1 | action_info: Arc<ActionInfo>, |
167 | 1 | ) -> Result<Box<dyn ActionStateResult>, Error> { |
168 | 1 | let unique_key0 = match &action_info.unique_qualifier { |
169 | 0 | ActionUniqueQualifier::Cachable(unique_key) => unique_key.clone(), |
170 | | ActionUniqueQualifier::Uncachable(_) => { |
171 | | // Cache lookup skipped, forward to the upstream. |
172 | 1 | return self |
173 | 1 | .action_scheduler |
174 | 1 | .add_action(client_operation_id, action_info) |
175 | 1 | .await; |
176 | | } |
177 | | }; |
178 | | |
179 | 0 | let cache_check_result = { |
180 | 0 | // Check this isn't a duplicate request first. |
181 | 0 | let mut inflight_cache_checks = self.inflight_cache_checks.lock(); |
182 | 0 | subscribe_to_existing_action( |
183 | 0 | &mut inflight_cache_checks, |
184 | 0 | &unique_key, |
185 | 0 | &client_operation_id, |
186 | 0 | ) |
187 | 0 | .ok_or_else(move || { |
188 | 0 | let (action_listener_tx, action_listener_rx) = oneshot::channel(); |
189 | 0 | inflight_cache_checks.insert( |
190 | 0 | unique_key.clone(), |
191 | 0 | vec![(client_operation_id, action_listener_tx)], |
192 | 0 | ); |
193 | 0 | // In the event we loose the reference to our `scope_guard`, it will remove |
194 | 0 | // the action from the inflight_cache_checks map. |
195 | 0 | let inflight_cache_checks = self.inflight_cache_checks.clone(); |
196 | 0 | ( |
197 | 0 | action_listener_rx, |
198 | 0 | guard((), move |()| { |
199 | 0 | inflight_cache_checks.lock().remove(&unique_key); |
200 | 0 | }), |
201 | 0 | ) |
202 | 0 | }) |
203 | | }; |
204 | 0 | let (action_listener_rx, scope_guard) = match cache_check_result { |
205 | 0 | Ok(action_listener_fut) => { |
206 | 0 | let action_listener = action_listener_fut.await.map_err(|_| { |
207 | 0 | make_err!( |
208 | 0 | Code::Internal, |
209 | 0 | "ActionStateResult tx hung up in CacheLookupScheduler::add_action" |
210 | 0 | ) |
211 | 0 | })?; |
212 | 0 | return action_listener; |
213 | | } |
214 | 0 | Err(client_tx_and_scope_guard) => client_tx_and_scope_guard, |
215 | 0 | }; |
216 | 0 |
|
217 | 0 | let ac_store = self.ac_store.clone(); |
218 | 0 | let action_scheduler = self.action_scheduler.clone(); |
219 | 0 | let inflight_cache_checks = self.inflight_cache_checks.clone(); |
220 | 0 | // We need this spawn because we are returning a stream and this spawn will populate the stream's data. |
221 | 0 | background_spawn!("cache_lookup_scheduler_add_action", async move { |
222 | 0 | // If our spawn ever dies, we will remove the action from the inflight_cache_checks map. |
223 | 0 | let _scope_guard = scope_guard; |
224 | | |
225 | 0 | let unique_key = match &action_info.unique_qualifier { |
226 | 0 | ActionUniqueQualifier::Cachable(unique_key) => unique_key, |
227 | 0 | ActionUniqueQualifier::Uncachable(unique_key) => { |
228 | 0 | event!( |
229 | 0 | Level::ERROR, |
230 | | ?action_info, |
231 | 0 | "ActionInfo::unique_qualifier should be ActionUniqueQualifier::Cachable()" |
232 | | ); |
233 | 0 | unique_key |
234 | | } |
235 | | }; |
236 | | |
237 | | // Perform cache check. |
238 | 0 | let instance_name = action_info.unique_qualifier.instance_name().clone(); |
239 | 0 | let maybe_action_result = get_action_from_store( |
240 | 0 | &ac_store, |
241 | 0 | action_info.unique_qualifier.digest(), |
242 | 0 | instance_name, |
243 | 0 | action_info.unique_qualifier.digest_function(), |
244 | 0 | ) |
245 | 0 | .await; |
246 | 0 | match maybe_action_result { |
247 | 0 | Ok(action_result) => { |
248 | 0 | let maybe_pending_txs = { |
249 | 0 | let mut inflight_cache_checks = inflight_cache_checks.lock(); |
250 | 0 | // We are ready to resolve the in-flight actions. We remove the |
251 | 0 | // in-flight actions from the map. |
252 | 0 | inflight_cache_checks.remove(unique_key) |
253 | | }; |
254 | 0 | let Some(pending_txs) = maybe_pending_txs else { Branch (254:25): [True: 0, False: 0]
Branch (254:25): [Folded - Ignored]
|
255 | 0 | return; // Nobody is waiting for this action anymore. |
256 | | }; |
257 | 0 | let mut action_state = ActionState { |
258 | 0 | client_operation_id: OperationId::default(), |
259 | 0 | stage: ActionStage::CompletedFromCache(action_result), |
260 | 0 | action_digest: action_info.unique_qualifier.digest(), |
261 | 0 | }; |
262 | 0 |
|
263 | 0 | let maybe_origin_metadata = |
264 | 0 | ActiveOriginContext::get_value(&ORIGIN_EVENT_COLLECTOR) |
265 | 0 | .ok() |
266 | 0 | .flatten() |
267 | 0 | .map(|v| v.metadata.clone()); |
268 | 0 | for (client_operation_id, pending_tx) in pending_txs { |
269 | 0 | action_state.client_operation_id = client_operation_id; |
270 | 0 | // Ignore errors here, as the other end may have hung up. |
271 | 0 | let _ = pending_tx.send(Ok(Box::new(CacheLookupActionStateResult { |
272 | 0 | action_state: Arc::new(action_state.clone()), |
273 | 0 | maybe_origin_metadata: maybe_origin_metadata.clone(), |
274 | 0 | change_called: false, |
275 | 0 | }))); |
276 | 0 | } |
277 | 0 | return; |
278 | | } |
279 | 0 | Err(err) => { |
280 | 0 | // NotFound errors just mean we need to execute our action. |
281 | 0 | if err.code != Code::NotFound { Branch (281:24): [True: 0, False: 0]
Branch (281:24): [Folded - Ignored]
|
282 | 0 | let err = err.append("In CacheLookupScheduler::add_action"); |
283 | 0 | let maybe_pending_txs = { |
284 | 0 | let mut inflight_cache_checks = inflight_cache_checks.lock(); |
285 | 0 | // We are ready to resolve the in-flight actions. We remove the |
286 | 0 | // in-flight actions from the map. |
287 | 0 | inflight_cache_checks.remove(unique_key) |
288 | | }; |
289 | 0 | let Some(pending_txs) = maybe_pending_txs else { Branch (289:29): [True: 0, False: 0]
Branch (289:29): [Folded - Ignored]
|
290 | 0 | return; // Nobody is waiting for this action anymore. |
291 | | }; |
292 | 0 | for (_client_operation_id, pending_tx) in pending_txs { |
293 | 0 | // Ignore errors here, as the other end may have hung up. |
294 | 0 | let _ = pending_tx.send(Err(err.clone())); |
295 | 0 | } |
296 | 0 | return; |
297 | 0 | } |
298 | 0 | } |
299 | 0 | } |
300 | 0 |
|
301 | 0 | let maybe_pending_txs = { |
302 | 0 | let mut inflight_cache_checks = inflight_cache_checks.lock(); |
303 | 0 | inflight_cache_checks.remove(unique_key) |
304 | | }; |
305 | 0 | let Some(pending_txs) = maybe_pending_txs else { Branch (305:17): [True: 0, False: 0]
Branch (305:17): [Folded - Ignored]
|
306 | 0 | return; // Noone is waiting for this action anymore. |
307 | | }; |
308 | | |
309 | 0 | for (client_operation_id, pending_tx) in pending_txs { |
310 | | // Ignore errors here, as the other end may have hung up. |
311 | 0 | let _ = pending_tx.send( |
312 | 0 | action_scheduler |
313 | 0 | .add_action(client_operation_id, action_info.clone()) |
314 | 0 | .await, |
315 | | ); |
316 | | } |
317 | 0 | }); |
318 | 0 | action_listener_rx |
319 | 0 | .await |
320 | 0 | .map_err(|_| { |
321 | 0 | make_err!( |
322 | 0 | Code::Internal, |
323 | 0 | "ActionStateResult tx hung up in CacheLookupScheduler::add_action" |
324 | 0 | ) |
325 | 0 | })? |
326 | 0 | .err_tip(|| "In CacheLookupScheduler::add_action") |
327 | 1 | } |
328 | | |
329 | 1 | async fn inner_filter_operations( |
330 | 1 | &self, |
331 | 1 | filter: OperationFilter, |
332 | 1 | ) -> Result<ActionStateResultStream, Error> { |
333 | 1 | self.action_scheduler |
334 | 1 | .filter_operations(filter) |
335 | 1 | .await |
336 | 1 | .err_tip(|| "In CacheLookupScheduler::filter_operations"0 ) |
337 | 1 | } |
338 | | } |
339 | | |
340 | | #[async_trait] |
341 | | impl ClientStateManager for CacheLookupScheduler { |
342 | | async fn add_action( |
343 | | &self, |
344 | | client_operation_id: OperationId, |
345 | | action_info: Arc<ActionInfo>, |
346 | 1 | ) -> Result<Box<dyn ActionStateResult>, Error> { |
347 | 1 | self.inner_add_action(client_operation_id, action_info) |
348 | 1 | .await |
349 | 2 | } |
350 | | |
351 | | async fn filter_operations( |
352 | | &self, |
353 | | filter: OperationFilter, |
354 | 1 | ) -> Result<ActionStateResultStream, Error> { |
355 | 1 | self.inner_filter_operations(filter).await |
356 | 2 | } |
357 | | |
358 | 0 | fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> { |
359 | 0 | self.action_scheduler.as_known_platform_property_provider() |
360 | 0 | } |
361 | | } |
362 | | |
363 | | impl RootMetricsComponent for CacheLookupScheduler {} |