Coverage Report

Created: 2025-03-08 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/cache_lookup_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
use std::sync::Arc;
17
18
use async_trait::async_trait;
19
use nativelink_error::{make_err, Code, Error, ResultExt};
20
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
21
use nativelink_proto::build::bazel::remote::execution::v2::{
22
    ActionResult as ProtoActionResult, GetActionResultRequest,
23
};
24
use nativelink_store::ac_utils::get_and_decode_digest;
25
use nativelink_store::grpc_store::GrpcStore;
26
use nativelink_util::action_messages::{
27
    ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, OperationId,
28
};
29
use nativelink_util::background_spawn;
30
use nativelink_util::common::DigestInfo;
31
use nativelink_util::digest_hasher::DigestHasherFunc;
32
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
33
use nativelink_util::operation_state_manager::{
34
    ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
35
};
36
use nativelink_util::origin_context::ActiveOriginContext;
37
use nativelink_util::origin_event::{OriginMetadata, ORIGIN_EVENT_COLLECTOR};
38
use nativelink_util::store_trait::Store;
39
use parking_lot::{Mutex, MutexGuard};
40
use scopeguard::guard;
41
use tokio::sync::oneshot;
42
use tonic::{Request, Response};
43
use tracing::{event, Level};
44
45
/// Actions that are having their cache checked or failed cache lookup and are
46
/// being forwarded upstream.  Missing the `skip_cache_check` actions which are
47
/// forwarded directly.
48
type CheckActions = HashMap<
49
    ActionUniqueKey,
50
    Vec<(
51
        OperationId,
52
        oneshot::Sender<Result<Box<dyn ActionStateResult>, Error>>,
53
    )>,
54
>;
55
56
#[derive(MetricsComponent)]
57
pub struct CacheLookupScheduler {
58
    /// A reference to the AC to find existing actions in.
59
    /// To prevent unintended issues, this store should probably be a `CompletenessCheckingStore`.
60
    #[metric(group = "ac_store")]
61
    ac_store: Store,
62
    /// The "real" scheduler to use to perform actions if they were not found
63
    /// in the action cache.
64
    #[metric(group = "action_scheduler")]
65
    action_scheduler: Arc<dyn ClientStateManager>,
66
    /// Actions that are currently performing a `CacheCheck`.
67
    inflight_cache_checks: Arc<Mutex<CheckActions>>,
68
}
69
70
0
async fn get_action_from_store(
71
0
    ac_store: &Store,
72
0
    action_digest: DigestInfo,
73
0
    instance_name: String,
74
0
    digest_function: DigestHasherFunc,
75
0
) -> Result<ProtoActionResult, Error> {
76
    // If we are a GrpcStore we shortcut here, as this is a special store.
77
0
    if let Some(grpc_store) = ac_store.downcast_ref::<GrpcStore>(Some(action_digest.into())) {
  Branch (77:12): [True: 0, False: 0]
  Branch (77:12): [Folded - Ignored]
78
0
        let action_result_request = GetActionResultRequest {
79
0
            instance_name,
80
0
            action_digest: Some(action_digest.into()),
81
0
            inline_stdout: false,
82
0
            inline_stderr: false,
83
0
            inline_output_files: Vec::new(),
84
0
            digest_function: digest_function.proto_digest_func().into(),
85
0
        };
86
0
        grpc_store
87
0
            .get_action_result(Request::new(action_result_request))
88
0
            .await
89
0
            .map(Response::into_inner)
90
    } else {
91
0
        get_and_decode_digest::<ProtoActionResult>(ac_store, action_digest.into()).await
92
    }
93
0
}
94
95
/// Future for when `ActionStateResults` are known.
96
type ActionStateResultOneshot = oneshot::Receiver<Result<Box<dyn ActionStateResult>, Error>>;
97
98
0
fn subscribe_to_existing_action(
99
0
    inflight_cache_checks: &mut MutexGuard<CheckActions>,
100
0
    unique_qualifier: &ActionUniqueKey,
101
0
    client_operation_id: &OperationId,
102
0
) -> Option<ActionStateResultOneshot> {
103
0
    inflight_cache_checks
104
0
        .get_mut(unique_qualifier)
105
0
        .map(|oneshots| {
106
0
            let (tx, rx) = oneshot::channel();
107
0
            oneshots.push((client_operation_id.clone(), tx));
108
0
            rx
109
0
        })
110
0
}
111
112
struct CacheLookupActionStateResult {
113
    action_state: Arc<ActionState>,
114
    maybe_origin_metadata: Option<OriginMetadata>,
115
    change_called: bool,
116
}
117
118
#[async_trait]
119
impl ActionStateResult for CacheLookupActionStateResult {
120
0
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
121
0
        Ok((
122
0
            self.action_state.clone(),
123
0
            self.maybe_origin_metadata.clone(),
124
0
        ))
125
0
    }
126
127
0
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
128
0
        if self.change_called {
  Branch (128:12): [True: 0, False: 0]
  Branch (128:12): [Folded - Ignored]
129
0
            return Err(make_err!(
130
0
                Code::Internal,
131
0
                "CacheLookupActionStateResult::changed called twice"
132
0
            ));
133
0
        }
134
0
        self.change_called = true;
135
0
        Ok((
136
0
            self.action_state.clone(),
137
0
            self.maybe_origin_metadata.clone(),
138
0
        ))
139
0
    }
140
141
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
142
        // TODO(allada) We should probably remove as_action_info()
143
        // or implement it properly.
144
0
        return Err(make_err!(
145
0
            Code::Unimplemented,
146
0
            "as_action_info not implemented for CacheLookupActionStateResult::as_action_info"
147
0
        ));
148
0
    }
149
}
150
151
impl CacheLookupScheduler {
152
2
    pub fn new(
153
2
        ac_store: Store,
154
2
        action_scheduler: Arc<dyn ClientStateManager>,
155
2
    ) -> Result<Self, Error> {
156
2
        Ok(Self {
157
2
            ac_store,
158
2
            action_scheduler,
159
2
            inflight_cache_checks: Arc::default(),
160
2
        })
161
2
    }
162
163
1
    async fn inner_add_action(
164
1
        &self,
165
1
        client_operation_id: OperationId,
166
1
        action_info: Arc<ActionInfo>,
167
1
    ) -> Result<Box<dyn ActionStateResult>, Error> {
168
1
        let 
unique_key0
= match &action_info.unique_qualifier {
169
0
            ActionUniqueQualifier::Cachable(unique_key) => unique_key.clone(),
170
            ActionUniqueQualifier::Uncachable(_) => {
171
                // Cache lookup skipped, forward to the upstream.
172
1
                return self
173
1
                    .action_scheduler
174
1
                    .add_action(client_operation_id, action_info)
175
1
                    .await;
176
            }
177
        };
178
179
0
        let cache_check_result = {
180
0
            // Check this isn't a duplicate request first.
181
0
            let mut inflight_cache_checks = self.inflight_cache_checks.lock();
182
0
            subscribe_to_existing_action(
183
0
                &mut inflight_cache_checks,
184
0
                &unique_key,
185
0
                &client_operation_id,
186
0
            )
187
0
            .ok_or_else(move || {
188
0
                let (action_listener_tx, action_listener_rx) = oneshot::channel();
189
0
                inflight_cache_checks.insert(
190
0
                    unique_key.clone(),
191
0
                    vec![(client_operation_id, action_listener_tx)],
192
0
                );
193
0
                // In the event we loose the reference to our `scope_guard`, it will remove
194
0
                // the action from the inflight_cache_checks map.
195
0
                let inflight_cache_checks = self.inflight_cache_checks.clone();
196
0
                (
197
0
                    action_listener_rx,
198
0
                    guard((), move |()| {
199
0
                        inflight_cache_checks.lock().remove(&unique_key);
200
0
                    }),
201
0
                )
202
0
            })
203
        };
204
0
        let (action_listener_rx, scope_guard) = match cache_check_result {
205
0
            Ok(action_listener_fut) => {
206
0
                let action_listener = action_listener_fut.await.map_err(|_| {
207
0
                    make_err!(
208
0
                        Code::Internal,
209
0
                        "ActionStateResult tx hung up in CacheLookupScheduler::add_action"
210
0
                    )
211
0
                })?;
212
0
                return action_listener;
213
            }
214
0
            Err(client_tx_and_scope_guard) => client_tx_and_scope_guard,
215
0
        };
216
0
217
0
        let ac_store = self.ac_store.clone();
218
0
        let action_scheduler = self.action_scheduler.clone();
219
0
        let inflight_cache_checks = self.inflight_cache_checks.clone();
220
0
        // We need this spawn because we are returning a stream and this spawn will populate the stream's data.
221
0
        background_spawn!("cache_lookup_scheduler_add_action", async move {
222
0
            // If our spawn ever dies, we will remove the action from the inflight_cache_checks map.
223
0
            let _scope_guard = scope_guard;
224
225
0
            let unique_key = match &action_info.unique_qualifier {
226
0
                ActionUniqueQualifier::Cachable(unique_key) => unique_key,
227
0
                ActionUniqueQualifier::Uncachable(unique_key) => {
228
0
                    event!(
229
0
                        Level::ERROR,
230
                        ?action_info,
231
0
                        "ActionInfo::unique_qualifier should be ActionUniqueQualifier::Cachable()"
232
                    );
233
0
                    unique_key
234
                }
235
            };
236
237
            // Perform cache check.
238
0
            let instance_name = action_info.unique_qualifier.instance_name().clone();
239
0
            let maybe_action_result = get_action_from_store(
240
0
                &ac_store,
241
0
                action_info.unique_qualifier.digest(),
242
0
                instance_name,
243
0
                action_info.unique_qualifier.digest_function(),
244
0
            )
245
0
            .await;
246
0
            match maybe_action_result {
247
0
                Ok(action_result) => {
248
0
                    let maybe_pending_txs = {
249
0
                        let mut inflight_cache_checks = inflight_cache_checks.lock();
250
0
                        // We are ready to resolve the in-flight actions. We remove the
251
0
                        // in-flight actions from the map.
252
0
                        inflight_cache_checks.remove(unique_key)
253
                    };
254
0
                    let Some(pending_txs) = maybe_pending_txs else {
  Branch (254:25): [True: 0, False: 0]
  Branch (254:25): [Folded - Ignored]
255
0
                        return; // Nobody is waiting for this action anymore.
256
                    };
257
0
                    let mut action_state = ActionState {
258
0
                        client_operation_id: OperationId::default(),
259
0
                        stage: ActionStage::CompletedFromCache(action_result),
260
0
                        action_digest: action_info.unique_qualifier.digest(),
261
0
                    };
262
0
263
0
                    let maybe_origin_metadata =
264
0
                        ActiveOriginContext::get_value(&ORIGIN_EVENT_COLLECTOR)
265
0
                            .ok()
266
0
                            .flatten()
267
0
                            .map(|v| v.metadata.clone());
268
0
                    for (client_operation_id, pending_tx) in pending_txs {
269
0
                        action_state.client_operation_id = client_operation_id;
270
0
                        // Ignore errors here, as the other end may have hung up.
271
0
                        let _ = pending_tx.send(Ok(Box::new(CacheLookupActionStateResult {
272
0
                            action_state: Arc::new(action_state.clone()),
273
0
                            maybe_origin_metadata: maybe_origin_metadata.clone(),
274
0
                            change_called: false,
275
0
                        })));
276
0
                    }
277
0
                    return;
278
                }
279
0
                Err(err) => {
280
0
                    // NotFound errors just mean we need to execute our action.
281
0
                    if err.code != Code::NotFound {
  Branch (281:24): [True: 0, False: 0]
  Branch (281:24): [Folded - Ignored]
282
0
                        let err = err.append("In CacheLookupScheduler::add_action");
283
0
                        let maybe_pending_txs = {
284
0
                            let mut inflight_cache_checks = inflight_cache_checks.lock();
285
0
                            // We are ready to resolve the in-flight actions. We remove the
286
0
                            // in-flight actions from the map.
287
0
                            inflight_cache_checks.remove(unique_key)
288
                        };
289
0
                        let Some(pending_txs) = maybe_pending_txs else {
  Branch (289:29): [True: 0, False: 0]
  Branch (289:29): [Folded - Ignored]
290
0
                            return; // Nobody is waiting for this action anymore.
291
                        };
292
0
                        for (_client_operation_id, pending_tx) in pending_txs {
293
0
                            // Ignore errors here, as the other end may have hung up.
294
0
                            let _ = pending_tx.send(Err(err.clone()));
295
0
                        }
296
0
                        return;
297
0
                    }
298
0
                }
299
0
            }
300
0
301
0
            let maybe_pending_txs = {
302
0
                let mut inflight_cache_checks = inflight_cache_checks.lock();
303
0
                inflight_cache_checks.remove(unique_key)
304
            };
305
0
            let Some(pending_txs) = maybe_pending_txs else {
  Branch (305:17): [True: 0, False: 0]
  Branch (305:17): [Folded - Ignored]
306
0
                return; // Noone is waiting for this action anymore.
307
            };
308
309
0
            for (client_operation_id, pending_tx) in pending_txs {
310
                // Ignore errors here, as the other end may have hung up.
311
0
                let _ = pending_tx.send(
312
0
                    action_scheduler
313
0
                        .add_action(client_operation_id, action_info.clone())
314
0
                        .await,
315
                );
316
            }
317
0
        });
318
0
        action_listener_rx
319
0
            .await
320
0
            .map_err(|_| {
321
0
                make_err!(
322
0
                    Code::Internal,
323
0
                    "ActionStateResult tx hung up in CacheLookupScheduler::add_action"
324
0
                )
325
0
            })?
326
0
            .err_tip(|| "In CacheLookupScheduler::add_action")
327
1
    }
328
329
1
    async fn inner_filter_operations(
330
1
        &self,
331
1
        filter: OperationFilter,
332
1
    ) -> Result<ActionStateResultStream, Error> {
333
1
        self.action_scheduler
334
1
            .filter_operations(filter)
335
1
            .await
336
1
            .err_tip(|| 
"In CacheLookupScheduler::filter_operations"0
)
337
1
    }
338
}
339
340
#[async_trait]
341
impl ClientStateManager for CacheLookupScheduler {
342
    async fn add_action(
343
        &self,
344
        client_operation_id: OperationId,
345
        action_info: Arc<ActionInfo>,
346
1
    ) -> Result<Box<dyn ActionStateResult>, Error> {
347
1
        self.inner_add_action(client_operation_id, action_info)
348
1
            .await
349
2
    }
350
351
    async fn filter_operations(
352
        &self,
353
        filter: OperationFilter,
354
1
    ) -> Result<ActionStateResultStream, Error> {
355
1
        self.inner_filter_operations(filter).await
356
2
    }
357
358
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
359
0
        self.action_scheduler.as_known_platform_property_provider()
360
0
    }
361
}
362
363
impl RootMetricsComponent for CacheLookupScheduler {}