Coverage Report

Created: 2026-04-14 11:55

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/memory_awaited_action_db.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::ops::{Bound, RangeBounds};
16
use core::time::Duration;
17
use std::collections::hash_map::Entry;
18
use std::collections::{BTreeMap, BTreeSet, HashMap};
19
use std::sync::Arc;
20
21
use async_lock::Mutex;
22
use futures::{FutureExt, Stream};
23
use nativelink_config::stores::EvictionPolicy;
24
use nativelink_error::{Code, Error, ResultExt, error_if, make_err};
25
use nativelink_metric::MetricsComponent;
26
use nativelink_util::action_messages::{
27
    ActionInfo, ActionStage, ActionUniqueKey, ActionUniqueQualifier, OperationId,
28
};
29
use nativelink_util::chunked_stream::ChunkedStream;
30
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
31
use nativelink_util::instant_wrapper::InstantWrapper;
32
use nativelink_util::metrics::{
33
    EXECUTION_METRICS, ExecutionResult, ExecutionStage, make_execution_attributes,
34
};
35
use nativelink_util::spawn;
36
use nativelink_util::task::JoinHandleDropGuard;
37
use tokio::sync::{Notify, mpsc, watch};
38
use tracing::{debug, error};
39
40
use crate::awaited_action_db::{
41
    AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, CLIENT_KEEPALIVE_DURATION,
42
    SortedAwaitedAction, SortedAwaitedActionState,
43
};
44
45
/// Number of events to process per cycle.
46
const MAX_ACTION_EVENTS_RX_PER_CYCLE: usize = 1024;
47
48
/// Represents a client that is currently listening to an action.
49
/// When the client is dropped, it will send the `AwaitedAction` to the
50
/// `event_tx` if there are other cleanups needed.
51
#[derive(Debug)]
52
struct ClientAwaitedAction {
53
    /// The `OperationId` that the client is listening to.
54
    operation_id: OperationId,
55
56
    /// The sender to notify of this struct being dropped.
57
    event_tx: mpsc::UnboundedSender<ActionEvent>,
58
}
59
60
impl ClientAwaitedAction {
61
28
    pub(crate) const fn new(
62
28
        operation_id: OperationId,
63
28
        event_tx: mpsc::UnboundedSender<ActionEvent>,
64
28
    ) -> Self {
65
28
        Self {
66
28
            operation_id,
67
28
            event_tx,
68
28
        }
69
28
    }
70
71
3
    pub(crate) const fn operation_id(&self) -> &OperationId {
72
3
        &self.operation_id
73
3
    }
74
}
75
76
impl Drop for ClientAwaitedAction {
77
28
    fn drop(&mut self) {
78
        // If we failed to send it means noone is listening.
79
28
        drop(self.event_tx.send(ActionEvent::ClientDroppedOperation(
80
28
            self.operation_id.clone(),
81
28
        )));
82
28
    }
83
}
84
85
/// Trait to be able to use the `EvictingMap` with `ClientAwaitedAction`.
86
/// Note: We only use `EvictingMap` for a time based eviction, which is
87
/// why the implementation has fixed default values in it.
88
impl LenEntry for ClientAwaitedAction {
89
    #[inline]
90
30
    fn len(&self) -> u64 {
91
30
        0
92
30
    }
93
94
    #[inline]
95
0
    fn is_empty(&self) -> bool {
96
0
        true
97
0
    }
98
}
99
100
/// Actions the `AwaitedActionsDb` needs to process.
101
#[derive(Debug)]
102
pub(crate) enum ActionEvent {
103
    /// A client has sent a keep alive message.
104
    ClientKeepAlive(OperationId),
105
    /// A client has dropped and pointed to `OperationId`.
106
    ClientDroppedOperation(OperationId),
107
}
108
109
/// Information required to track an individual client
110
/// keep alive config and state.
111
#[derive(Debug)]
112
struct ClientInfo<I: InstantWrapper, NowFn: Fn() -> I> {
113
    /// The client operation id.
114
    client_operation_id: OperationId,
115
    /// The last time a keep alive was sent.
116
    last_keep_alive: I,
117
    /// The function to get the current time.
118
    now_fn: NowFn,
119
    /// The sender to notify of this struct had an event.
120
    event_tx: mpsc::UnboundedSender<ActionEvent>,
121
}
122
123
/// Subscriber that clients can be used to monitor when `AwaitedActions` change.
124
#[derive(Debug)]
125
pub struct MemoryAwaitedActionSubscriber<I: InstantWrapper, NowFn: Fn() -> I> {
126
    /// The receiver to listen for changes.
127
    awaited_action_rx: watch::Receiver<AwaitedAction>,
128
    /// If a client id is known this is the info needed to keep the client
129
    /// action alive.
130
    client_info: Option<ClientInfo<I, NowFn>>,
131
}
132
133
impl<I: InstantWrapper, NowFn: Fn() -> I> MemoryAwaitedActionSubscriber<I, NowFn> {
134
585
    fn new(mut awaited_action_rx: watch::Receiver<AwaitedAction>) -> Self {
135
585
        awaited_action_rx.mark_changed();
136
585
        Self {
137
585
            awaited_action_rx,
138
585
            client_info: None,
139
585
        }
140
585
    }
141
142
31
    fn new_with_client(
143
31
        mut awaited_action_rx: watch::Receiver<AwaitedAction>,
144
31
        client_operation_id: OperationId,
145
31
        event_tx: mpsc::UnboundedSender<ActionEvent>,
146
31
        now_fn: NowFn,
147
31
    ) -> Self
148
31
    where
149
31
        NowFn: Fn() -> I,
150
    {
151
31
        awaited_action_rx.mark_changed();
152
31
        Self {
153
31
            awaited_action_rx,
154
31
            client_info: Some(ClientInfo {
155
31
                client_operation_id,
156
31
                last_keep_alive: I::from_secs(0),
157
31
                now_fn,
158
31
                event_tx,
159
31
            }),
160
31
        }
161
31
    }
162
}
163
164
impl<I, NowFn> AwaitedActionSubscriber for MemoryAwaitedActionSubscriber<I, NowFn>
165
where
166
    I: InstantWrapper,
167
    NowFn: Fn() -> I + Send + Sync + 'static,
168
{
169
58
    async fn changed(&mut self) -> Result<AwaitedAction, Error> {
170
44
        let client_operation_id = {
171
58
            let changed_fut = self.awaited_action_rx.changed().map(|r| 
{44
172
44
                r.map_err(|e| 
{0
173
0
                    Error::from_std_err(Code::Internal, &e)
174
0
                        .append("Failed to wait for awaited action to change")
175
0
                })
176
44
            });
177
58
            let Some(client_info) = self.client_info.as_mut() else {
178
0
                changed_fut.await?;
179
0
                return Ok(self.awaited_action_rx.borrow().clone());
180
            };
181
58
            tokio::pin!(changed_fut);
182
            loop {
183
162
                if client_info.last_keep_alive.elapsed() > CLIENT_KEEPALIVE_DURATION {
184
55
                    client_info.last_keep_alive = (client_info.now_fn)();
185
55
                    // Failing to send just means our receiver dropped.
186
55
                    drop(client_info.event_tx.send(ActionEvent::ClientKeepAlive(
187
55
                        client_info.client_operation_id.clone(),
188
55
                    )));
189
107
                }
190
162
                let sleep_fut = (client_info.now_fn)().sleep(CLIENT_KEEPALIVE_DURATION);
191
162
                tokio::select! {
192
162
                    
result44
= &mut changed_fut => {
193
44
                        result
?0
;
194
44
                        break;
195
                    }
196
162
                    () = sleep_fut => {
197
104
                        // If we haven't received any updates for a while, we should
198
104
                        // let the database know that we are still listening to prevent
199
104
                        // the action from being dropped.
200
104
                    }
201
                }
202
            }
203
44
            client_info.client_operation_id.clone()
204
        };
205
        // At this stage we know that this event is a client request, so we need
206
        // to populate the client_operation_id.
207
44
        let mut awaited_action = self.awaited_action_rx.borrow().clone();
208
44
        awaited_action.set_client_operation_id(client_operation_id);
209
44
        Ok(awaited_action)
210
44
    }
211
212
672
    async fn borrow(&self) -> Result<AwaitedAction, Error> {
213
672
        let mut awaited_action = self.awaited_action_rx.borrow().clone();
214
672
        if let Some(
client_info16
) = self.client_info.as_ref() {
215
16
            awaited_action.set_client_operation_id(client_info.client_operation_id.clone());
216
656
        }
217
672
        Ok(awaited_action)
218
672
    }
219
}
220
221
/// A struct that is used to keep the developer from trying to
222
/// return early from a function.
223
struct NoEarlyReturn;
224
225
#[derive(Debug, Default, MetricsComponent)]
226
struct SortedAwaitedActions {
227
    #[metric(group = "unknown")]
228
    unknown: BTreeSet<SortedAwaitedAction>,
229
    #[metric(group = "cache_check")]
230
    cache_check: BTreeSet<SortedAwaitedAction>,
231
    #[metric(group = "queued")]
232
    queued: BTreeSet<SortedAwaitedAction>,
233
    #[metric(group = "executing")]
234
    executing: BTreeSet<SortedAwaitedAction>,
235
    #[metric(group = "completed")]
236
    completed: BTreeSet<SortedAwaitedAction>,
237
}
238
239
impl SortedAwaitedActions {
240
39
    const fn btree_for_state(&mut self, state: &ActionStage) -> &mut BTreeSet<SortedAwaitedAction> {
241
39
        match state {
242
0
            ActionStage::Unknown => &mut self.unknown,
243
0
            ActionStage::CacheCheck => &mut self.cache_check,
244
26
            ActionStage::Queued => &mut self.queued,
245
13
            ActionStage::Executing => &mut self.executing,
246
0
            ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => &mut self.completed,
247
        }
248
39
    }
249
250
64
    fn insert_sort_map_for_stage(
251
64
        &mut self,
252
64
        stage: &ActionStage,
253
64
        sorted_awaited_action: &SortedAwaitedAction,
254
64
    ) -> Result<(), Error> {
255
64
        let newly_inserted = match stage {
256
0
            ActionStage::Unknown => self.unknown.insert(sorted_awaited_action.clone()),
257
0
            ActionStage::CacheCheck => self.cache_check.insert(sorted_awaited_action.clone()),
258
32
            ActionStage::Queued => self.queued.insert(sorted_awaited_action.clone()),
259
26
            ActionStage::Executing => self.executing.insert(sorted_awaited_action.clone()),
260
6
            ActionStage::Completed(_) => self.completed.insert(sorted_awaited_action.clone()),
261
            ActionStage::CompletedFromCache(_) => {
262
0
                self.completed.insert(sorted_awaited_action.clone())
263
            }
264
        };
265
64
        if !newly_inserted {
266
0
            return Err(make_err!(
267
0
                Code::Internal,
268
0
                "Tried to insert an action that was already in the sorted map. This should never happen. {:?} - {:?}",
269
0
                stage,
270
0
                sorted_awaited_action
271
0
            ));
272
64
        }
273
64
        Ok(())
274
64
    }
275
276
39
    fn process_state_changes(
277
39
        &mut self,
278
39
        old_awaited_action: &AwaitedAction,
279
39
        new_awaited_action: &AwaitedAction,
280
39
    ) -> Result<(), Error> {
281
39
        let btree = self.btree_for_state(&old_awaited_action.state().stage);
282
39
        let maybe_sorted_awaited_action = btree.take(&SortedAwaitedAction {
283
39
            sort_key: old_awaited_action.sort_key(),
284
39
            operation_id: new_awaited_action.operation_id().clone(),
285
39
        });
286
287
39
        let Some(sorted_awaited_action) = maybe_sorted_awaited_action else {
288
0
            return Err(make_err!(
289
0
                Code::Internal,
290
0
                "sorted_action_info_hash_keys and action_info_hash_key_to_awaited_action are out of sync - {} - {:?}",
291
0
                new_awaited_action.operation_id(),
292
0
                new_awaited_action,
293
0
            ));
294
        };
295
296
39
        self.insert_sort_map_for_stage(&new_awaited_action.state().stage, &sorted_awaited_action)
297
39
            .err_tip(|| "In AwaitedActionDb::update_awaited_action")
?0
;
298
39
        Ok(())
299
39
    }
300
}
301
302
/// The database for storing the state of all actions.
303
#[derive(Debug, MetricsComponent)]
304
pub struct AwaitedActionDbImpl<I: InstantWrapper, NowFn: Fn() -> I> {
305
    /// A lookup table to lookup the state of an action by its client operation id.
306
    #[metric(group = "client_operation_ids")]
307
    client_operation_to_awaited_action:
308
        EvictingMap<OperationId, OperationId, Arc<ClientAwaitedAction>, I>,
309
310
    /// A lookup table to lookup the state of an action by its worker operation id.
311
    #[metric(group = "operation_ids")]
312
    operation_id_to_awaited_action: BTreeMap<OperationId, watch::Sender<AwaitedAction>>,
313
314
    /// A lookup table to lookup the state of an action by its unique qualifier.
315
    #[metric(group = "action_info_hash_key_to_awaited_action")]
316
    action_info_hash_key_to_awaited_action: HashMap<ActionUniqueKey, OperationId>,
317
318
    /// A sorted set of [`AwaitedAction`]s. A wrapper is used to perform sorting
319
    /// based on the [`AwaitedActionSortKey`] of the [`AwaitedAction`].
320
    ///
321
    /// See [`AwaitedActionSortKey`] for more information on the ordering.
322
    #[metric(group = "sorted_action_infos")]
323
    sorted_action_info_hash_keys: SortedAwaitedActions,
324
325
    /// The number of connected clients for each operation id.
326
    #[metric(group = "connected_clients_for_operation_id")]
327
    connected_clients_for_operation_id: HashMap<OperationId, usize>,
328
329
    /// Where to send notifications about important events related to actions.
330
    action_event_tx: mpsc::UnboundedSender<ActionEvent>,
331
332
    /// The function to get the current time.
333
    now_fn: NowFn,
334
}
335
336
impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbImpl<I, NowFn> {
337
3
    async fn get_awaited_action_by_id(
338
3
        &self,
339
3
        client_operation_id: &OperationId,
340
3
    ) -> Result<Option<MemoryAwaitedActionSubscriber<I, NowFn>>, Error> {
341
3
        let maybe_client_awaited_action = self
342
3
            .client_operation_to_awaited_action
343
3
            .get(client_operation_id)
344
3
            .await;
345
3
        let Some(client_awaited_action) = maybe_client_awaited_action else {
346
0
            return Ok(None);
347
        };
348
349
3
        self.operation_id_to_awaited_action
350
3
            .get(client_awaited_action.operation_id())
351
3
            .map(|tx| {
352
3
                Some(MemoryAwaitedActionSubscriber::new_with_client(
353
3
                    tx.subscribe(),
354
3
                    client_operation_id.clone(),
355
3
                    self.action_event_tx.clone(),
356
3
                    self.now_fn.clone(),
357
3
                ))
358
3
            })
359
3
            .ok_or_else(|| 
{0
360
0
                make_err!(
361
0
                    Code::Internal,
362
                    "Failed to get client operation id {client_operation_id:?}"
363
                )
364
0
            })
365
3
    }
366
367
    /// Processes action events that need to be handled by the database.
368
55
    async fn handle_action_events(
369
55
        &mut self,
370
55
        action_events: impl IntoIterator<Item = ActionEvent>,
371
55
    ) -> NoEarlyReturn {
372
55
        for action in action_events {
373
55
            debug!(?action, "Handling action");
374
55
            match action {
375
1
                ActionEvent::ClientDroppedOperation(operation_id) => {
376
                    // Cleanup operation_id_to_awaited_action.
377
1
                    let Some(tx) = self.operation_id_to_awaited_action.remove(&operation_id) else {
378
0
                        error!(
379
                            %operation_id,
380
                            "operation_id_to_awaited_action does not have operation_id"
381
                        );
382
0
                        continue;
383
                    };
384
385
1
                    let connected_clients = match self
386
1
                        .connected_clients_for_operation_id
387
1
                        .entry(operation_id.clone())
388
                    {
389
1
                        Entry::Occupied(entry) => {
390
1
                            let value = *entry.get();
391
1
                            entry.remove();
392
1
                            value - 1
393
                        }
394
                        Entry::Vacant(_) => {
395
0
                            error!(
396
                                %operation_id,
397
                                "connected_clients_for_operation_id does not have operation_id"
398
                            );
399
0
                            0
400
                        }
401
                    };
402
403
                    // Note: It is rare to have more than one client listening
404
                    // to the same action, so we assume that we are the last
405
                    // client and insert it back into the map if we detect that
406
                    // there are still clients listening (ie: the happy path
407
                    // is operation.connected_clients == 0).
408
1
                    if connected_clients != 0 {
409
1
                        self.operation_id_to_awaited_action
410
1
                            .insert(operation_id.clone(), tx);
411
1
                        self.connected_clients_for_operation_id
412
1
                            .insert(operation_id, connected_clients);
413
1
                        continue;
414
0
                    }
415
0
                    debug!(%operation_id, "Clearing operation from state manager");
416
0
                    let awaited_action = tx.borrow().clone();
417
                    // Cleanup action_info_hash_key_to_awaited_action if it was marked cached.
418
0
                    match &awaited_action.action_info().unique_qualifier {
419
0
                        ActionUniqueQualifier::Cacheable(action_key) => {
420
0
                            let maybe_awaited_action = self
421
0
                                .action_info_hash_key_to_awaited_action
422
0
                                .remove(action_key);
423
0
                            if !awaited_action.state().stage.is_finished()
424
0
                                && maybe_awaited_action.is_none()
425
                            {
426
0
                                error!(
427
                                    %operation_id,
428
                                    ?awaited_action,
429
                                    ?action_key,
430
                                    "action_info_hash_key_to_awaited_action and operation_id_to_awaited_action are out of sync",
431
                                );
432
0
                            }
433
                        }
434
0
                        ActionUniqueQualifier::Uncacheable(_action_key) => {
435
0
                            // This Operation should not be in the hash_key map.
436
0
                        }
437
                    }
438
439
                    // Cleanup sorted_awaited_action.
440
0
                    let sort_key = awaited_action.sort_key();
441
0
                    let sort_btree_for_state = self
442
0
                        .sorted_action_info_hash_keys
443
0
                        .btree_for_state(&awaited_action.state().stage);
444
445
0
                    let maybe_sorted_awaited_action =
446
0
                        sort_btree_for_state.take(&SortedAwaitedAction {
447
0
                            sort_key,
448
0
                            operation_id: operation_id.clone(),
449
0
                        });
450
0
                    if maybe_sorted_awaited_action.is_none() {
451
0
                        error!(
452
                            %operation_id,
453
                            ?sort_key,
454
                            "Expected maybe_sorted_awaited_action to have {sort_key:?}",
455
                        );
456
0
                    }
457
                }
458
54
                ActionEvent::ClientKeepAlive(client_id) => {
459
54
                    if let Some(client_awaited_action) = self
460
54
                        .client_operation_to_awaited_action
461
54
                        .get(&client_id)
462
54
                        .await
463
                    {
464
54
                        if let Some(awaited_action_sender) = self
465
54
                            .operation_id_to_awaited_action
466
54
                            .get(&client_awaited_action.operation_id)
467
                        {
468
54
                            awaited_action_sender.send_if_modified(|awaited_action| {
469
54
                                awaited_action.update_client_keep_alive((self.now_fn)().now());
470
54
                                false
471
54
                            });
472
0
                        }
473
                    } else {
474
0
                        error!(
475
                            ?client_id,
476
                            "client_operation_to_awaited_action does not have client_id",
477
                        );
478
                    }
479
                }
480
            }
481
        }
482
55
        NoEarlyReturn
483
55
    }
484
485
2
    fn get_awaited_actions_range(
486
2
        &self,
487
2
        start: Bound<&OperationId>,
488
2
        end: Bound<&OperationId>,
489
2
    ) -> impl Iterator<Item = (&'_ OperationId, MemoryAwaitedActionSubscriber<I, NowFn>)>
490
2
    + use<'_, I, NowFn> {
491
2
        self.operation_id_to_awaited_action
492
2
            .range((start, end))
493
2
            .map(|(operation_id, tx)| 
{0
494
0
                (
495
0
                    operation_id,
496
0
                    MemoryAwaitedActionSubscriber::<I, NowFn>::new(tx.subscribe()),
497
0
                )
498
0
            })
499
2
    }
500
501
586
    fn get_by_operation_id(
502
586
        &self,
503
586
        operation_id: &OperationId,
504
586
    ) -> Option<MemoryAwaitedActionSubscriber<I, NowFn>> {
505
586
        self.operation_id_to_awaited_action
506
586
            .get(operation_id)
507
586
            .map(|tx| 
MemoryAwaitedActionSubscriber::<I, NowFn>::new585
(
tx585
.
subscribe585
()))
508
586
    }
509
510
1.13k
    fn get_range_of_actions(
511
1.13k
        &self,
512
1.13k
        state: SortedAwaitedActionState,
513
1.13k
        range: impl RangeBounds<SortedAwaitedAction>,
514
1.13k
    ) -> impl DoubleEndedIterator<
515
1.13k
        Item = Result<
516
1.13k
            (
517
1.13k
                &SortedAwaitedAction,
518
1.13k
                MemoryAwaitedActionSubscriber<I, NowFn>,
519
1.13k
            ),
520
1.13k
            Error,
521
1.13k
        >,
522
1.13k
    > {
523
1.13k
        let btree = match state {
524
0
            SortedAwaitedActionState::CacheCheck => &self.sorted_action_info_hash_keys.cache_check,
525
1.13k
            SortedAwaitedActionState::Queued => &self.sorted_action_info_hash_keys.queued,
526
0
            SortedAwaitedActionState::Executing => &self.sorted_action_info_hash_keys.executing,
527
0
            SortedAwaitedActionState::Completed => &self.sorted_action_info_hash_keys.completed,
528
        };
529
1.13k
        btree.range(range).map(|sorted_awaited_action| 
{545
530
545
            let operation_id = &sorted_awaited_action.operation_id;
531
545
            self.get_by_operation_id(operation_id)
532
545
                .ok_or_else(|| 
{0
533
0
                    make_err!(
534
0
                        Code::Internal,
535
                        "Failed to get operation id {}",
536
                        operation_id
537
                    )
538
0
                })
539
545
                .map(|subscriber| (sorted_awaited_action, subscriber))
540
545
        })
541
1.13k
    }
542
543
39
    fn process_state_changes_for_hash_key_map(
544
39
        action_info_hash_key_to_awaited_action: &mut HashMap<ActionUniqueKey, OperationId>,
545
39
        new_awaited_action: &AwaitedAction,
546
39
    ) {
547
        // Only process changes if the stage is not finished.
548
39
        if !new_awaited_action.state().stage.is_finished() {
549
33
            return;
550
6
        }
551
6
        match &new_awaited_action.action_info().unique_qualifier {
552
6
            ActionUniqueQualifier::Cacheable(action_key) => {
553
6
                let maybe_awaited_action =
554
6
                    action_info_hash_key_to_awaited_action.remove(action_key);
555
6
                match maybe_awaited_action {
556
6
                    Some(removed_operation_id) => {
557
6
                        if &removed_operation_id != new_awaited_action.operation_id() {
558
0
                            error!(
559
                                ?removed_operation_id,
560
                                ?new_awaited_action,
561
                                ?action_key,
562
                                "action_info_hash_key_to_awaited_action and operation_id_to_awaited_action are out of sync",
563
                            );
564
6
                        }
565
                    }
566
                    None => {
567
0
                        error!(
568
                            ?new_awaited_action,
569
                            ?action_key,
570
                            "action_info_hash_key_to_awaited_action out of sync, it should have had the unique_key",
571
                        );
572
                    }
573
                }
574
            }
575
0
            ActionUniqueQualifier::Uncacheable(_action_key) => {
576
0
                // If we are not cacheable, the action should not be in the
577
0
                // hash_key map, so we don't need to process anything in
578
0
                // action_info_hash_key_to_awaited_action.
579
0
            }
580
        }
581
39
    }
582
583
39
    fn update_awaited_action(
584
39
        &mut self,
585
39
        mut new_awaited_action: AwaitedAction,
586
39
    ) -> Result<(), Error> {
587
39
        let tx = self
588
39
            .operation_id_to_awaited_action
589
39
            .get(new_awaited_action.operation_id())
590
39
            .ok_or_else(|| 
{0
591
0
                make_err!(
592
0
                    Code::Internal,
593
                    "OperationId does not exist in map in AwaitedActionDb::update_awaited_action"
594
                )
595
0
            })?;
596
        {
597
            // Note: It's important to drop old_awaited_action before we call
598
            // send_replace or we will have a deadlock.
599
39
            let old_awaited_action = tx.borrow();
600
601
            // Do not process changes if the action version is not in sync with
602
            // what the sender based the update on.
603
39
            if old_awaited_action.version() != new_awaited_action.version() {
604
0
                return Err(make_err!(
605
0
                    // From: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
606
0
                    // Use ABORTED if the client should retry at a higher level
607
0
                    // (e.g., when a client-specified test-and-set fails,
608
0
                    // indicating the client should restart a read-modify-write
609
0
                    // sequence)
610
0
                    Code::Aborted,
611
0
                    "{} Expected {} but got {} for operation_id {:?} - {:?}",
612
0
                    "Tried to update an awaited action with an incorrect version.",
613
0
                    old_awaited_action.version(),
614
0
                    new_awaited_action.version(),
615
0
                    old_awaited_action,
616
0
                    new_awaited_action,
617
0
                ));
618
39
            }
619
39
            new_awaited_action.increment_version();
620
621
0
            error_if!(
622
39
                old_awaited_action.action_info().unique_qualifier
623
39
                    != new_awaited_action.action_info().unique_qualifier,
624
                "Unique key changed for operation_id {:?} - {:?} - {:?}",
625
0
                new_awaited_action.operation_id(),
626
0
                old_awaited_action.action_info(),
627
0
                new_awaited_action.action_info(),
628
            );
629
39
            let is_same_stage = old_awaited_action
630
39
                .state()
631
39
                .stage
632
39
                .is_same_stage(&new_awaited_action.state().stage);
633
634
39
            if !is_same_stage {
635
                // Record metrics for stage transitions
636
39
                let metrics = &*EXECUTION_METRICS;
637
39
                let old_stage = &old_awaited_action.state().stage;
638
39
                let new_stage = &new_awaited_action.state().stage;
639
640
                // Track stage transitions
641
39
                let base_attrs = make_execution_attributes(
642
39
                    "unknown",
643
39
                    None,
644
39
                    Some(old_awaited_action.action_info().priority),
645
                );
646
39
                metrics.execution_stage_transitions.add(1, &base_attrs);
647
648
                // Update active count for old stage
649
39
                let old_stage_attrs = vec![opentelemetry::KeyValue::new(
650
                    nativelink_util::metrics::EXECUTION_STAGE,
651
39
                    ExecutionStage::from(old_stage),
652
                )];
653
39
                metrics.execution_active_count.add(-1, &old_stage_attrs);
654
655
                // Update active count for new stage
656
39
                let new_stage_attrs = vec![opentelemetry::KeyValue::new(
657
                    nativelink_util::metrics::EXECUTION_STAGE,
658
39
                    ExecutionStage::from(new_stage),
659
                )];
660
39
                metrics.execution_active_count.add(1, &new_stage_attrs);
661
662
                // Record completion metrics with action digest for failure tracking
663
39
                let action_digest = old_awaited_action.action_info().digest().to_string();
664
39
                if let ActionStage::Completed(
action_result6
) = new_stage {
665
6
                    let result_attrs = vec![
666
6
                        opentelemetry::KeyValue::new(
667
                            nativelink_util::metrics::EXECUTION_RESULT,
668
6
                            if action_result.exit_code == 0 {
669
5
                                ExecutionResult::Success
670
                            } else {
671
1
                                ExecutionResult::Failure
672
                            },
673
                        ),
674
6
                        opentelemetry::KeyValue::new(
675
                            nativelink_util::metrics::EXECUTION_ACTION_DIGEST,
676
6
                            action_digest,
677
                        ),
678
                    ];
679
6
                    metrics.execution_completed_count.add(1, &result_attrs);
680
33
                } else if let ActionStage::CompletedFromCache(_) = new_stage {
681
0
                    let result_attrs = vec![
682
0
                        opentelemetry::KeyValue::new(
683
0
                            nativelink_util::metrics::EXECUTION_RESULT,
684
0
                            ExecutionResult::CacheHit,
685
0
                        ),
686
0
                        opentelemetry::KeyValue::new(
687
0
                            nativelink_util::metrics::EXECUTION_ACTION_DIGEST,
688
0
                            action_digest,
689
0
                        ),
690
0
                    ];
691
0
                    metrics.execution_completed_count.add(1, &result_attrs);
692
33
                }
693
694
39
                self.sorted_action_info_hash_keys
695
39
                    .process_state_changes(&old_awaited_action, &new_awaited_action)
?0
;
696
39
                Self::process_state_changes_for_hash_key_map(
697
39
                    &mut self.action_info_hash_key_to_awaited_action,
698
39
                    &new_awaited_action,
699
                );
700
0
            }
701
        }
702
703
        // Notify all listeners of the new state and ignore if no one is listening.
704
        // Note: Do not use `.send()` as it will not update the state if all listeners
705
        // are dropped.
706
39
        drop(tx.send_replace(new_awaited_action));
707
708
39
        Ok(())
709
39
    }
710
711
    /// Creates a new [`ClientAwaitedAction`] and a [`watch::Receiver`] to
712
    /// listen for changes. We don't do this in-line because it is important
713
    /// to ALWAYS construct a [`ClientAwaitedAction`] before inserting it into
714
    /// the map. Failing to do so may result in memory leaks. This is because
715
    /// [`ClientAwaitedAction`] implements a drop function that will trigger
716
    /// cleanup of the other maps on drop.
717
25
    fn make_client_awaited_action(
718
25
        &mut self,
719
25
        operation_id: &OperationId,
720
25
        awaited_action: AwaitedAction,
721
25
    ) -> (Arc<ClientAwaitedAction>, watch::Receiver<AwaitedAction>) {
722
25
        let (tx, rx) = watch::channel(awaited_action);
723
25
        let client_awaited_action = Arc::new(ClientAwaitedAction::new(
724
25
            operation_id.clone(),
725
25
            self.action_event_tx.clone(),
726
        ));
727
25
        self.operation_id_to_awaited_action
728
25
            .insert(operation_id.clone(), tx);
729
25
        self.connected_clients_for_operation_id
730
25
            .insert(operation_id.clone(), 1);
731
25
        (client_awaited_action, rx)
732
25
    }
733
734
28
    async fn add_action(
735
28
        &mut self,
736
28
        client_operation_id: OperationId,
737
28
        action_info: Arc<ActionInfo>,
738
28
    ) -> Result<MemoryAwaitedActionSubscriber<I, NowFn>, Error> {
739
        // Check to see if the action is already known and subscribe if it is.
740
28
        let subscription_result = self
741
28
            .try_subscribe(
742
28
                &client_operation_id,
743
28
                &action_info.unique_qualifier,
744
28
                action_info.priority,
745
28
            )
746
28
            .await
747
28
            .err_tip(|| "In AwaitedActionDb::subscribe_or_add_action");
748
28
        match subscription_result {
749
0
            Err(err) => return Err(err),
750
3
            Ok(Some(subscription)) => return Ok(subscription),
751
25
            Ok(None) => { /* Add item to queue. */ }
752
        }
753
754
25
        let maybe_unique_key = match &action_info.unique_qualifier {
755
25
            ActionUniqueQualifier::Cacheable(unique_key) => Some(unique_key.clone()),
756
0
            ActionUniqueQualifier::Uncacheable(_unique_key) => None,
757
        };
758
25
        let operation_id = OperationId::default();
759
25
        let awaited_action = AwaitedAction::new(
760
25
            operation_id.clone(),
761
25
            action_info.clone(),
762
25
            (self.now_fn)().now(),
763
        );
764
25
        debug_assert!(
765
0
            ActionStage::Queued == awaited_action.state().stage,
766
            "Expected action to be queued"
767
        );
768
25
        let sort_key = awaited_action.sort_key();
769
770
25
        let (client_awaited_action, rx) =
771
25
            self.make_client_awaited_action(&operation_id.clone(), awaited_action);
772
773
25
        debug!(
774
            %client_operation_id,
775
            %operation_id,
776
            ?client_awaited_action,
777
            "Adding action"
778
        );
779
780
25
        self.client_operation_to_awaited_action
781
25
            .insert(client_operation_id.clone(), client_awaited_action)
782
25
            .await;
783
784
        // Note: We only put items in the map that are cacheable.
785
25
        if let Some(unique_key) = maybe_unique_key {
786
25
            let old_value = self
787
25
                .action_info_hash_key_to_awaited_action
788
25
                .insert(unique_key, operation_id.clone());
789
25
            if let Some(
old_value0
) = old_value {
790
0
                error!(
791
                    %operation_id,
792
                    ?old_value,
793
                    "action_info_hash_key_to_awaited_action already has unique_key"
794
                );
795
25
            }
796
0
        }
797
798
        // Record metric for new action entering the queue
799
25
        let metrics = &*EXECUTION_METRICS;
800
25
        let _base_attrs = make_execution_attributes("unknown", None, Some(action_info.priority));
801
25
        let queued_attrs = vec![opentelemetry::KeyValue::new(
802
            nativelink_util::metrics::EXECUTION_STAGE,
803
25
            ExecutionStage::Queued,
804
        )];
805
25
        metrics.execution_active_count.add(1, &queued_attrs);
806
807
25
        self.sorted_action_info_hash_keys
808
25
            .insert_sort_map_for_stage(
809
25
                &ActionStage::Queued,
810
25
                &SortedAwaitedAction {
811
25
                    sort_key,
812
25
                    operation_id,
813
25
                },
814
            )
815
25
            .err_tip(|| "In AwaitedActionDb::subscribe_or_add_action")
?0
;
816
817
25
        Ok(MemoryAwaitedActionSubscriber::new_with_client(
818
25
            rx,
819
25
            client_operation_id,
820
25
            self.action_event_tx.clone(),
821
25
            self.now_fn.clone(),
822
25
        ))
823
28
    }
824
825
28
    async fn try_subscribe(
826
28
        &mut self,
827
28
        client_operation_id: &OperationId,
828
28
        unique_qualifier: &ActionUniqueQualifier,
829
28
        // TODO(palfrey) To simplify the scheduler 2024 refactor, we
830
28
        // removed the ability to upgrade priorities of actions.
831
28
        // we should add priority upgrades back in.
832
28
        _priority: i32,
833
28
    ) -> Result<Option<MemoryAwaitedActionSubscriber<I, NowFn>>, Error> {
834
28
        let unique_key = match unique_qualifier {
835
28
            ActionUniqueQualifier::Cacheable(unique_key) => unique_key,
836
0
            ActionUniqueQualifier::Uncacheable(_unique_key) => return Ok(None),
837
        };
838
839
28
        let Some(
operation_id3
) = self.action_info_hash_key_to_awaited_action.get(unique_key) else {
840
25
            return Ok(None); // Not currently running.
841
        };
842
843
3
        let Some(tx) = self.operation_id_to_awaited_action.get(operation_id) else {
844
0
            return Err(make_err!(
845
0
                Code::Internal,
846
0
                "operation_id_to_awaited_action and action_info_hash_key_to_awaited_action are out of sync for {unique_key:?} - {operation_id}"
847
0
            ));
848
        };
849
850
0
        error_if!(
851
3
            tx.borrow().state().stage.is_finished(),
852
            "Tried to subscribe to a completed action but it already finished. This should never happen. {:?}",
853
0
            tx.borrow()
854
        );
855
856
3
        let maybe_connected_clients = self
857
3
            .connected_clients_for_operation_id
858
3
            .get_mut(operation_id);
859
3
        let Some(connected_clients) = maybe_connected_clients else {
860
0
            return Err(make_err!(
861
0
                Code::Internal,
862
0
                "connected_clients_for_operation_id and operation_id_to_awaited_action are out of sync for {unique_key:?} - {operation_id}"
863
0
            ));
864
        };
865
3
        *connected_clients += 1;
866
867
        // Immediately mark the keep alive, we don't need to wake anyone
868
        // so we always fake that it was not actually changed.
869
        // Failing update the client could lead to the client connecting
870
        // then not updating the keep alive in time, resulting in the
871
        // operation timing out due to async behavior.
872
3
        tx.send_if_modified(|awaited_action| {
873
3
            awaited_action.update_client_keep_alive((self.now_fn)().now());
874
3
            false
875
3
        });
876
3
        let subscription = tx.subscribe();
877
878
3
        self.client_operation_to_awaited_action
879
3
            .insert(
880
3
                client_operation_id.clone(),
881
3
                Arc::new(ClientAwaitedAction::new(
882
3
                    operation_id.clone(),
883
3
                    self.action_event_tx.clone(),
884
3
                )),
885
3
            )
886
3
            .await;
887
888
3
        Ok(Some(MemoryAwaitedActionSubscriber::new_with_client(
889
3
            subscription,
890
3
            client_operation_id.clone(),
891
3
            self.action_event_tx.clone(),
892
3
            self.now_fn.clone(),
893
3
        )))
894
28
    }
895
}
896
897
#[derive(Debug, MetricsComponent)]
898
pub struct MemoryAwaitedActionDb<I: InstantWrapper, NowFn: Fn() -> I> {
899
    #[metric]
900
    inner: Arc<Mutex<AwaitedActionDbImpl<I, NowFn>>>,
901
    tasks_change_notify: Arc<Notify>,
902
    _handle_awaited_action_events: JoinHandleDropGuard<()>,
903
}
904
905
impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static>
906
    MemoryAwaitedActionDb<I, NowFn>
907
{
908
23
    pub fn new(
909
23
        eviction_config: &EvictionPolicy,
910
23
        tasks_change_notify: Arc<Notify>,
911
23
        now_fn: NowFn,
912
23
    ) -> Self {
913
23
        let (action_event_tx, mut action_event_rx) = mpsc::unbounded_channel();
914
23
        let inner = Arc::new(Mutex::new(AwaitedActionDbImpl {
915
23
            client_operation_to_awaited_action: EvictingMap::new(eviction_config, (now_fn)()),
916
23
            operation_id_to_awaited_action: BTreeMap::new(),
917
23
            action_info_hash_key_to_awaited_action: HashMap::new(),
918
23
            sorted_action_info_hash_keys: SortedAwaitedActions::default(),
919
23
            connected_clients_for_operation_id: HashMap::new(),
920
23
            action_event_tx,
921
23
            now_fn,
922
23
        }));
923
23
        let weak_inner = Arc::downgrade(&inner);
924
        Self {
925
23
            inner,
926
23
            tasks_change_notify,
927
23
            _handle_awaited_action_events: spawn!("handle_awaited_action_events", async move 
{21
928
21
                let mut dropped_operation_ids = Vec::with_capacity(MAX_ACTION_EVENTS_RX_PER_CYCLE);
929
                loop {
930
76
                    dropped_operation_ids.clear();
931
76
                    action_event_rx
932
76
                        .recv_many(&mut dropped_operation_ids, MAX_ACTION_EVENTS_RX_PER_CYCLE)
933
76
                        .await;
934
55
                    let Some(inner) = weak_inner.upgrade() else {
935
0
                        return; // Nothing to cleanup, our struct is dropped.
936
                    };
937
55
                    let mut inner = inner.lock().await;
938
55
                    inner
939
55
                        .handle_action_events(dropped_operation_ids.drain(..))
940
55
                        .await;
941
                }
942
0
            }),
943
        }
944
23
    }
945
}
946
947
impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static> AwaitedActionDb
948
    for MemoryAwaitedActionDb<I, NowFn>
949
{
950
    type Subscriber = MemoryAwaitedActionSubscriber<I, NowFn>;
951
952
3
    async fn get_awaited_action_by_id(
953
3
        &self,
954
3
        client_operation_id: &OperationId,
955
3
    ) -> Result<Option<Self::Subscriber>, Error> {
956
3
        self.inner
957
3
            .lock()
958
3
            .await
959
3
            .get_awaited_action_by_id(client_operation_id)
960
3
            .await
961
3
    }
962
963
2
    async fn get_all_awaited_actions(
964
2
        &self,
965
2
    ) -> Result<impl Stream<Item = Result<Self::Subscriber, Error>>, Error> {
966
2
        Ok(ChunkedStream::new(
967
2
            Bound::Unbounded,
968
2
            Bound::Unbounded,
969
2
            move |start, end, mut output| async move {
970
2
                let inner = self.inner.lock().await;
971
2
                let mut maybe_new_start = None;
972
973
0
                for (operation_id, item) in
974
2
                    inner.get_awaited_actions_range(start.as_ref(), end.as_ref())
975
0
                {
976
0
                    output.push_back(item);
977
0
                    maybe_new_start = Some(operation_id);
978
0
                }
979
980
2
                Ok(maybe_new_start
981
2
                    .map(|new_start| (
(Bound::Excluded(new_start.clone()), end)0
,
output0
)))
982
4
            },
983
        ))
984
2
    }
985
986
41
    async fn get_by_operation_id(
987
41
        &self,
988
41
        operation_id: &OperationId,
989
41
    ) -> Result<Option<Self::Subscriber>, Error> {
990
41
        Ok(self.inner.lock().await.get_by_operation_id(operation_id))
991
41
    }
992
993
593
    async fn get_range_of_actions(
994
593
        &self,
995
593
        state: SortedAwaitedActionState,
996
593
        start: Bound<SortedAwaitedAction>,
997
593
        end: Bound<SortedAwaitedAction>,
998
593
        desc: bool,
999
593
    ) -> Result<impl Stream<Item = Result<Self::Subscriber, Error>> + Send, Error> {
1000
593
        Ok(ChunkedStream::new(
1001
593
            start,
1002
593
            end,
1003
1.13k
            move |start, end, mut output| async move {
1004
1.13k
                let inner = self.inner.lock().await;
1005
1.13k
                let mut done = true;
1006
1.13k
                let mut new_start = start.as_ref();
1007
1.13k
                let mut new_end = end.as_ref();
1008
1009
1.13k
                let iterator = inner
1010
1.13k
                    .get_range_of_actions(state, (start.as_ref(), end.as_ref()))
1011
1.13k
                    .map(|res| 
res545
.
err_tip545
(|| "In AwaitedActionDb::get_range_of_actions"));
1012
1013
                // TODO(palfrey) This should probably use the `.left()/right()` pattern,
1014
                // but that doesn't exist in the std or any libraries we use.
1015
1.13k
                if desc {
1016
135
                    for 
result45
in iterator.rev() {
1017
45
                        let (sorted_awaited_action, item) =
1018
45
                            result.err_tip(|| "In AwaitedActionDb::get_range_of_actions")
?0
;
1019
45
                        output.push_back(item);
1020
45
                        new_end = Bound::Excluded(sorted_awaited_action);
1021
45
                        done = false;
1022
                    }
1023
                } else {
1024
1.00k
                    for 
result500
in iterator {
1025
500
                        let (sorted_awaited_action, item) =
1026
500
                            result.err_tip(|| "In AwaitedActionDb::get_range_of_actions")
?0
;
1027
500
                        output.push_back(item);
1028
500
                        new_start = Bound::Excluded(sorted_awaited_action);
1029
500
                        done = false;
1030
                    }
1031
                }
1032
1.13k
                if done {
1033
593
                    return Ok(None);
1034
542
                }
1035
542
                Ok(Some(((new_start.cloned(), new_end.cloned()), output)))
1036
2.27k
            },
1037
        ))
1038
593
    }
1039
1040
39
    async fn update_awaited_action(&self, new_awaited_action: AwaitedAction) -> Result<(), Error> {
1041
39
        self.inner
1042
39
            .lock()
1043
39
            .await
1044
39
            .update_awaited_action(new_awaited_action)
?0
;
1045
39
        self.tasks_change_notify.notify_one();
1046
39
        Ok(())
1047
39
    }
1048
1049
28
    async fn add_action(
1050
28
        &self,
1051
28
        client_operation_id: OperationId,
1052
28
        action_info: Arc<ActionInfo>,
1053
28
        _no_event_action_timeout: Duration,
1054
28
    ) -> Result<Self::Subscriber, Error> {
1055
28
        let subscriber = self
1056
28
            .inner
1057
28
            .lock()
1058
28
            .await
1059
28
            .add_action(client_operation_id, action_info)
1060
28
            .await
?0
;
1061
28
        self.tasks_change_notify.notify_one();
1062
28
        Ok(subscriber)
1063
28
    }
1064
}