Coverage Report

Created: 2026-06-23 15:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/cache_lookup_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
use std::sync::Arc;
17
use std::time::SystemTime;
18
19
use async_trait::async_trait;
20
use nativelink_error::{Code, Error, ResultExt, make_err};
21
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
22
use nativelink_proto::build::bazel::remote::execution::v2::{
23
    ActionResult as ProtoActionResult, GetActionResultRequest,
24
};
25
use nativelink_store::ac_utils::get_and_decode_digest;
26
use nativelink_store::grpc_store::GrpcStore;
27
use nativelink_util::action_messages::{
28
    ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, OperationId,
29
};
30
use nativelink_util::background_spawn;
31
use nativelink_util::common::DigestInfo;
32
use nativelink_util::digest_hasher::DigestHasherFunc;
33
use nativelink_util::operation_state_manager::{
34
    ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
35
};
36
use nativelink_util::origin_event::{
37
    BAZEL_METADATA_KEY, OriginMetadata, request_metadata_from_baggage,
38
};
39
use nativelink_util::store_trait::Store;
40
use opentelemetry::baggage::BaggageExt;
41
use opentelemetry::context::Context;
42
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
43
use parking_lot::{Mutex, MutexGuard};
44
use scopeguard::guard;
45
use tokio::sync::oneshot;
46
use tonic::{Request, Response};
47
use tracing::error;
48
49
use crate::known_platform_property_provider::KnownPlatformPropertyProvider;
50
51
/// Actions that are having their cache checked or failed cache lookup and are
52
/// being forwarded upstream.  Missing the `skip_cache_check` actions which are
53
/// forwarded directly.
54
type CheckActions = HashMap<
55
    ActionUniqueKey,
56
    Vec<(
57
        OperationId,
58
        oneshot::Sender<Result<Box<dyn ActionStateResult>, Error>>,
59
    )>,
60
>;
61
62
#[derive(MetricsComponent)]
63
pub struct CacheLookupScheduler {
64
    /// A reference to the AC to find existing actions in.
65
    /// To prevent unintended issues, this store should probably be a `CompletenessCheckingStore`.
66
    #[metric(group = "ac_store")]
67
    ac_store: Store,
68
    /// The "real" scheduler to use to perform actions if they were not found
69
    /// in the action cache.
70
    #[metric(group = "action_scheduler")]
71
    action_scheduler: Arc<dyn KnownPlatformPropertyProvider>,
72
    /// Actions that are currently performing a `CacheCheck`.
73
    inflight_cache_checks: Arc<Mutex<CheckActions>>,
74
}
75
76
impl core::fmt::Debug for CacheLookupScheduler {
77
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
78
0
        f.debug_struct("CacheLookupScheduler")
79
0
            .field("ac_store", &self.ac_store)
80
0
            .finish_non_exhaustive()
81
0
    }
82
}
83
84
0
async fn get_action_from_store(
85
0
    ac_store: &Store,
86
0
    action_digest: DigestInfo,
87
0
    instance_name: String,
88
0
    digest_function: DigestHasherFunc,
89
0
) -> Result<ProtoActionResult, Error> {
90
    // If we are a GrpcStore we shortcut here, as this is a special store.
91
0
    if let Some(grpc_store) = ac_store.downcast_ref::<GrpcStore>(Some(action_digest.into())) {
92
0
        let action_result_request = GetActionResultRequest {
93
0
            instance_name,
94
0
            action_digest: Some(action_digest.into()),
95
0
            inline_stdout: false,
96
0
            inline_stderr: false,
97
0
            inline_output_files: Vec::new(),
98
0
            digest_function: digest_function.proto_digest_func().into(),
99
0
        };
100
0
        grpc_store
101
0
            .get_action_result(Request::new(action_result_request))
102
0
            .await
103
0
            .map(Response::into_inner)
104
    } else {
105
0
        get_and_decode_digest::<ProtoActionResult>(ac_store, action_digest.into()).await
106
    }
107
0
}
108
109
/// Future for when `ActionStateResults` are known.
110
type ActionStateResultOneshot = oneshot::Receiver<Result<Box<dyn ActionStateResult>, Error>>;
111
112
0
fn subscribe_to_existing_action(
113
0
    inflight_cache_checks: &mut MutexGuard<CheckActions>,
114
0
    unique_qualifier: &ActionUniqueKey,
115
0
    client_operation_id: &OperationId,
116
0
) -> Option<ActionStateResultOneshot> {
117
0
    inflight_cache_checks
118
0
        .get_mut(unique_qualifier)
119
0
        .map(|oneshots| {
120
0
            let (tx, rx) = oneshot::channel();
121
0
            oneshots.push((client_operation_id.clone(), tx));
122
0
            rx
123
0
        })
124
0
}
125
126
struct CacheLookupActionStateResult {
127
    action_state: Arc<ActionState>,
128
    maybe_origin_metadata: Option<OriginMetadata>,
129
    change_called: bool,
130
}
131
132
#[async_trait]
133
impl ActionStateResult for CacheLookupActionStateResult {
134
0
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
135
        Ok((
136
            self.action_state.clone(),
137
            self.maybe_origin_metadata.clone(),
138
        ))
139
0
    }
140
141
0
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
142
        if self.change_called {
143
            return Err(make_err!(
144
                Code::Internal,
145
                "CacheLookupActionStateResult::changed called twice"
146
            ));
147
        }
148
        self.change_called = true;
149
        Ok((
150
            self.action_state.clone(),
151
            self.maybe_origin_metadata.clone(),
152
        ))
153
0
    }
154
155
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
156
        // TODO(palfrey) We should probably remove as_action_info()
157
        // or implement it properly.
158
        return Err(make_err!(
159
            Code::Unimplemented,
160
            "as_action_info not implemented for CacheLookupActionStateResult::as_action_info"
161
        ));
162
0
    }
163
}
164
165
impl CacheLookupScheduler {
166
2
    pub fn new(
167
2
        ac_store: Store,
168
2
        action_scheduler: Arc<dyn KnownPlatformPropertyProvider>,
169
2
    ) -> Result<Self, Error> {
170
2
        Ok(Self {
171
2
            ac_store,
172
2
            action_scheduler,
173
2
            inflight_cache_checks: Arc::default(),
174
2
        })
175
2
    }
176
177
1
    async fn inner_add_action(
178
1
        &self,
179
1
        client_operation_id: OperationId,
180
1
        action_info: Arc<ActionInfo>,
181
1
    ) -> Result<Box<dyn ActionStateResult>, Error> {
182
1
        let 
unique_key0
= match &action_info.unique_qualifier {
183
0
            ActionUniqueQualifier::Cacheable(unique_key) => unique_key.clone(),
184
            ActionUniqueQualifier::Uncacheable(_) => {
185
                // Cache lookup skipped, forward to the upstream.
186
1
                return self
187
1
                    .action_scheduler
188
1
                    .add_action(client_operation_id, action_info)
189
1
                    .await;
190
            }
191
        };
192
193
0
        let cache_check_result = {
194
            // Check this isn't a duplicate request first.
195
0
            let mut inflight_cache_checks = self.inflight_cache_checks.lock();
196
0
            subscribe_to_existing_action(
197
0
                &mut inflight_cache_checks,
198
0
                &unique_key,
199
0
                &client_operation_id,
200
            )
201
0
            .ok_or_else(move || {
202
0
                let (action_listener_tx, action_listener_rx) = oneshot::channel();
203
0
                inflight_cache_checks.insert(
204
0
                    unique_key.clone(),
205
0
                    vec![(client_operation_id, action_listener_tx)],
206
0
                );
207
                // In the event we loose the reference to our `scope_guard`, it will remove
208
                // the action from the inflight_cache_checks map.
209
0
                let inflight_cache_checks = self.inflight_cache_checks.clone();
210
                (
211
0
                    action_listener_rx,
212
0
                    guard((), move |()| {
213
0
                        inflight_cache_checks.lock().remove(&unique_key);
214
0
                    }),
215
                )
216
0
            })
217
        };
218
0
        let (action_listener_rx, scope_guard) = match cache_check_result {
219
0
            Ok(action_listener_fut) => {
220
0
                let action_listener = action_listener_fut.await.map_err(|err| {
221
0
                    Error::from_std_err(Code::Internal, &err)
222
0
                        .append("ActionStateResult tx hung up in CacheLookupScheduler::add_action")
223
0
                })?;
224
0
                return action_listener;
225
            }
226
0
            Err(client_tx_and_scope_guard) => client_tx_and_scope_guard,
227
        };
228
229
0
        let ac_store = self.ac_store.clone();
230
0
        let action_scheduler = self.action_scheduler.clone();
231
0
        let inflight_cache_checks = self.inflight_cache_checks.clone();
232
        // We need this spawn because we are returning a stream and this spawn will populate the stream's data.
233
0
        background_spawn!("cache_lookup_scheduler_add_action", async move {
234
            // If our spawn ever dies, we will remove the action from the inflight_cache_checks map.
235
0
            let _scope_guard = scope_guard;
236
237
0
            let unique_key = match &action_info.unique_qualifier {
238
0
                ActionUniqueQualifier::Cacheable(unique_key) => unique_key,
239
0
                ActionUniqueQualifier::Uncacheable(unique_key) => {
240
0
                    error!(
241
                        ?action_info,
242
                        "ActionInfo::unique_qualifier should be ActionUniqueQualifier::Cacheable()"
243
                    );
244
0
                    unique_key
245
                }
246
            };
247
248
            // Perform cache check.
249
0
            let instance_name = action_info.unique_qualifier.instance_name().clone();
250
0
            let maybe_action_result = get_action_from_store(
251
0
                &ac_store,
252
0
                action_info.unique_qualifier.digest(),
253
0
                instance_name,
254
0
                action_info.unique_qualifier.digest_function(),
255
0
            )
256
0
            .await;
257
0
            match maybe_action_result {
258
0
                Ok(action_result) => {
259
0
                    let maybe_pending_txs = {
260
0
                        let mut inflight_cache_checks = inflight_cache_checks.lock();
261
                        // We are ready to resolve the in-flight actions. We remove the
262
                        // in-flight actions from the map.
263
0
                        inflight_cache_checks.remove(unique_key)
264
                    };
265
0
                    let Some(pending_txs) = maybe_pending_txs else {
266
0
                        return; // Nobody is waiting for this action anymore.
267
                    };
268
0
                    let mut action_state = ActionState {
269
0
                        client_operation_id: OperationId::default(),
270
0
                        stage: ActionStage::CompletedFromCache(action_result),
271
0
                        action_digest: action_info.unique_qualifier.digest(),
272
0
                        last_transition_timestamp: SystemTime::now(),
273
0
                    };
274
275
0
                    let ctx = Context::current();
276
0
                    let baggage = ctx.baggage();
277
278
0
                    let maybe_origin_metadata = if baggage.is_empty() {
279
0
                        None
280
                    } else {
281
0
                        let bazel_metadata = baggage
282
0
                            .get(BAZEL_METADATA_KEY)
283
0
                            .and_then(|value| request_metadata_from_baggage(value.as_str()).ok());
284
                        Some(OriginMetadata {
285
0
                            identity: baggage
286
0
                                .get(ENDUSER_ID)
287
0
                                .map(|v| v.as_str().to_string())
288
0
                                .unwrap_or_default(),
289
0
                            bazel_metadata,
290
                        })
291
                    };
292
293
0
                    for (client_operation_id, pending_tx) in pending_txs {
294
0
                        action_state.client_operation_id = client_operation_id;
295
0
                        // Ignore errors here, as the other end may have hung up.
296
0
                        drop(pending_tx.send(Ok(Box::new(CacheLookupActionStateResult {
297
0
                            action_state: Arc::new(action_state.clone()),
298
0
                            maybe_origin_metadata: maybe_origin_metadata.clone(),
299
0
                            change_called: false,
300
0
                        }))));
301
0
                    }
302
0
                    return;
303
                }
304
0
                Err(err) => {
305
                    // NotFound errors just mean we need to execute our action.
306
0
                    if err.code != Code::NotFound {
307
0
                        let err = err.append("In CacheLookupScheduler::add_action");
308
0
                        let maybe_pending_txs = {
309
0
                            let mut inflight_cache_checks = inflight_cache_checks.lock();
310
                            // We are ready to resolve the in-flight actions. We remove the
311
                            // in-flight actions from the map.
312
0
                            inflight_cache_checks.remove(unique_key)
313
                        };
314
0
                        let Some(pending_txs) = maybe_pending_txs else {
315
0
                            return; // Nobody is waiting for this action anymore.
316
                        };
317
0
                        for (_client_operation_id, pending_tx) in pending_txs {
318
0
                            // Ignore errors here, as the other end may have hung up.
319
0
                            drop(pending_tx.send(Err(err.clone())));
320
0
                        }
321
0
                        return;
322
0
                    }
323
                }
324
            }
325
326
0
            let maybe_pending_txs = {
327
0
                let mut inflight_cache_checks = inflight_cache_checks.lock();
328
0
                inflight_cache_checks.remove(unique_key)
329
            };
330
0
            let Some(pending_txs) = maybe_pending_txs else {
331
0
                return; // Noone is waiting for this action anymore.
332
            };
333
334
0
            for (client_operation_id, pending_tx) in pending_txs {
335
                // Ignore errors here, as the other end may have hung up.
336
0
                drop(
337
0
                    pending_tx.send(
338
0
                        action_scheduler
339
0
                            .add_action(client_operation_id, action_info.clone())
340
0
                            .await,
341
                    ),
342
                );
343
            }
344
0
        });
345
0
        action_listener_rx
346
0
            .await
347
0
            .map_err(|err| {
348
0
                Error::from_std_err(Code::Internal, &err)
349
0
                    .append("ActionStateResult tx hung up in CacheLookupScheduler::add_action")
350
0
            })?
351
0
            .err_tip(|| "In CacheLookupScheduler::add_action")
352
1
    }
353
354
1
    async fn inner_filter_operations(
355
1
        &self,
356
1
        filter: OperationFilter,
357
1
    ) -> Result<ActionStateResultStream<'_>, Error> {
358
1
        self.action_scheduler
359
1
            .filter_operations(filter)
360
1
            .await
361
1
            .err_tip(|| "In CacheLookupScheduler::filter_operations")
362
1
    }
363
}
364
365
#[async_trait]
366
impl ClientStateManager for CacheLookupScheduler {
367
    async fn add_action(
368
        &self,
369
        client_operation_id: OperationId,
370
        action_info: Arc<ActionInfo>,
371
1
    ) -> Result<Box<dyn ActionStateResult>, Error> {
372
        self.inner_add_action(client_operation_id, action_info)
373
            .await
374
1
    }
375
376
    async fn filter_operations(
377
        &self,
378
        filter: OperationFilter,
379
1
    ) -> Result<ActionStateResultStream, Error> {
380
        self.inner_filter_operations(filter).await
381
1
    }
382
}
383
384
#[async_trait]
385
impl KnownPlatformPropertyProvider for CacheLookupScheduler {
386
0
    async fn get_known_properties(&self, instance_name: &str) -> Result<Vec<String>, Error> {
387
        self.action_scheduler
388
            .get_known_properties(instance_name)
389
            .await
390
0
    }
391
}
392
393
impl RootMetricsComponent for CacheLookupScheduler {}