Coverage Report

Created: 2025-12-16 15:31

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/memory_awaited_action_db.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::ops::{Bound, RangeBounds};
16
use core::time::Duration;
17
use std::collections::hash_map::Entry;
18
use std::collections::{BTreeMap, BTreeSet, HashMap};
19
use std::sync::Arc;
20
21
use async_lock::Mutex;
22
use futures::{FutureExt, Stream};
23
use nativelink_config::stores::EvictionPolicy;
24
use nativelink_error::{Code, Error, ResultExt, error_if, make_err};
25
use nativelink_metric::MetricsComponent;
26
use nativelink_util::action_messages::{
27
    ActionInfo, ActionStage, ActionUniqueKey, ActionUniqueQualifier, OperationId,
28
};
29
use nativelink_util::chunked_stream::ChunkedStream;
30
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
31
use nativelink_util::instant_wrapper::InstantWrapper;
32
use nativelink_util::metrics::{
33
    EXECUTION_METRICS, ExecutionResult, ExecutionStage, make_execution_attributes,
34
};
35
use nativelink_util::spawn;
36
use nativelink_util::task::JoinHandleDropGuard;
37
use tokio::sync::{Notify, mpsc, watch};
38
use tracing::{debug, error};
39
40
use crate::awaited_action_db::{
41
    AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, CLIENT_KEEPALIVE_DURATION,
42
    SortedAwaitedAction, SortedAwaitedActionState,
43
};
44
45
/// Number of events to process per cycle.
46
const MAX_ACTION_EVENTS_RX_PER_CYCLE: usize = 1024;
47
48
/// Represents a client that is currently listening to an action.
49
/// When the client is dropped, it will send the `AwaitedAction` to the
50
/// `event_tx` if there are other cleanups needed.
51
#[derive(Debug)]
52
struct ClientAwaitedAction {
53
    /// The `OperationId` that the client is listening to.
54
    operation_id: OperationId,
55
56
    /// The sender to notify of this struct being dropped.
57
    event_tx: mpsc::UnboundedSender<ActionEvent>,
58
}
59
60
impl ClientAwaitedAction {
61
27
    pub(crate) const fn new(
62
27
        operation_id: OperationId,
63
27
        event_tx: mpsc::UnboundedSender<ActionEvent>,
64
27
    ) -> Self {
65
27
        Self {
66
27
            operation_id,
67
27
            event_tx,
68
27
        }
69
27
    }
70
71
3
    pub(crate) const fn operation_id(&self) -> &OperationId {
72
3
        &self.operation_id
73
3
    }
74
}
75
76
impl Drop for ClientAwaitedAction {
77
27
    fn drop(&mut self) {
78
        // If we failed to send it means noone is listening.
79
27
        drop(self.event_tx.send(ActionEvent::ClientDroppedOperation(
80
27
            self.operation_id.clone(),
81
27
        )));
82
27
    }
83
}
84
85
/// Trait to be able to use the `EvictingMap` with `ClientAwaitedAction`.
86
/// Note: We only use `EvictingMap` for a time based eviction, which is
87
/// why the implementation has fixed default values in it.
88
impl LenEntry for ClientAwaitedAction {
89
    #[inline]
90
29
    fn len(&self) -> u64 {
91
29
        0
92
29
    }
93
94
    #[inline]
95
0
    fn is_empty(&self) -> bool {
96
0
        true
97
0
    }
98
}
99
100
/// Actions the `AwaitedActionsDb` needs to process.
101
#[derive(Debug)]
102
pub(crate) enum ActionEvent {
103
    /// A client has sent a keep alive message.
104
    ClientKeepAlive(OperationId),
105
    /// A client has dropped and pointed to `OperationId`.
106
    ClientDroppedOperation(OperationId),
107
}
108
109
/// Information required to track an individual client
110
/// keep alive config and state.
111
#[derive(Debug)]
112
struct ClientInfo<I: InstantWrapper, NowFn: Fn() -> I> {
113
    /// The client operation id.
114
    client_operation_id: OperationId,
115
    /// The last time a keep alive was sent.
116
    last_keep_alive: I,
117
    /// The function to get the current time.
118
    now_fn: NowFn,
119
    /// The sender to notify of this struct had an event.
120
    event_tx: mpsc::UnboundedSender<ActionEvent>,
121
}
122
123
/// Subscriber that clients can be used to monitor when `AwaitedActions` change.
124
#[derive(Debug)]
125
pub struct MemoryAwaitedActionSubscriber<I: InstantWrapper, NowFn: Fn() -> I> {
126
    /// The receiver to listen for changes.
127
    awaited_action_rx: watch::Receiver<AwaitedAction>,
128
    /// If a client id is known this is the info needed to keep the client
129
    /// action alive.
130
    client_info: Option<ClientInfo<I, NowFn>>,
131
}
132
133
impl<I: InstantWrapper, NowFn: Fn() -> I> MemoryAwaitedActionSubscriber<I, NowFn> {
134
583
    fn new(mut awaited_action_rx: watch::Receiver<AwaitedAction>) -> Self {
135
583
        awaited_action_rx.mark_changed();
136
583
        Self {
137
583
            awaited_action_rx,
138
583
            client_info: None,
139
583
        }
140
583
    }
141
142
30
    fn new_with_client(
143
30
        mut awaited_action_rx: watch::Receiver<AwaitedAction>,
144
30
        client_operation_id: OperationId,
145
30
        event_tx: mpsc::UnboundedSender<ActionEvent>,
146
30
        now_fn: NowFn,
147
30
    ) -> Self
148
30
    where
149
30
        NowFn: Fn() -> I,
150
    {
151
30
        awaited_action_rx.mark_changed();
152
30
        Self {
153
30
            awaited_action_rx,
154
30
            client_info: Some(ClientInfo {
155
30
                client_operation_id,
156
30
                last_keep_alive: I::from_secs(0),
157
30
                now_fn,
158
30
                event_tx,
159
30
            }),
160
30
        }
161
30
    }
162
}
163
164
impl<I, NowFn> AwaitedActionSubscriber for MemoryAwaitedActionSubscriber<I, NowFn>
165
where
166
    I: InstantWrapper,
167
    NowFn: Fn() -> I + Send + Sync + 'static,
168
{
169
58
    async fn changed(&mut self) -> Result<AwaitedAction, Error> {
170
44
        let client_operation_id = {
171
58
            let changed_fut = self.awaited_action_rx.changed().map(|r| 
{44
172
44
                r.map_err(|e| 
{0
173
0
                    make_err!(
174
0
                        Code::Internal,
175
                        "Failed to wait for awaited action to change {e:?}"
176
                    )
177
0
                })
178
44
            });
179
58
            let Some(client_info) = self.client_info.as_mut() else {
  Branch (179:17): [True: 0, False: 0]
  Branch (179:17): [Folded - Ignored]
  Branch (179:17): [True: 58, False: 0]
180
0
                changed_fut.await?;
181
0
                return Ok(self.awaited_action_rx.borrow().clone());
182
            };
183
58
            tokio::pin!(changed_fut);
184
            loop {
185
164
                if client_info.last_keep_alive.elapsed() > CLIENT_KEEPALIVE_DURATION {
  Branch (185:20): [True: 0, False: 0]
  Branch (185:20): [Folded - Ignored]
  Branch (185:20): [True: 55, False: 109]
186
55
                    client_info.last_keep_alive = (client_info.now_fn)();
187
55
                    // Failing to send just means our receiver dropped.
188
55
                    drop(client_info.event_tx.send(ActionEvent::ClientKeepAlive(
189
55
                        client_info.client_operation_id.clone(),
190
55
                    )));
191
109
                }
192
164
                let sleep_fut = (client_info.now_fn)().sleep(CLIENT_KEEPALIVE_DURATION);
193
164
                tokio::select! {
194
164
                    
result44
= &mut changed_fut => {
195
44
                        result
?0
;
196
44
                        break;
197
                    }
198
164
                    () = sleep_fut => {
199
106
                        // If we haven't received any updates for a while, we should
200
106
                        // let the database know that we are still listening to prevent
201
106
                        // the action from being dropped.
202
106
                    }
203
                }
204
            }
205
44
            client_info.client_operation_id.clone()
206
        };
207
        // At this stage we know that this event is a client request, so we need
208
        // to populate the client_operation_id.
209
44
        let mut awaited_action = self.awaited_action_rx.borrow().clone();
210
44
        awaited_action.set_client_operation_id(client_operation_id);
211
44
        Ok(awaited_action)
212
44
    }
213
214
668
    async fn borrow(&self) -> Result<AwaitedAction, Error> {
215
668
        let mut awaited_action = self.awaited_action_rx.borrow().clone();
216
668
        if let Some(
client_info16
) = self.client_info.as_ref() {
  Branch (216:16): [True: 0, False: 0]
  Branch (216:16): [Folded - Ignored]
  Branch (216:16): [True: 0, False: 0]
  Branch (216:16): [True: 16, False: 652]
217
16
            awaited_action.set_client_operation_id(client_info.client_operation_id.clone());
218
652
        }
219
668
        Ok(awaited_action)
220
668
    }
221
}
222
223
/// A struct that is used to keep the developer from trying to
224
/// return early from a function.
225
struct NoEarlyReturn;
226
227
#[derive(Debug, Default, MetricsComponent)]
228
struct SortedAwaitedActions {
229
    #[metric(group = "unknown")]
230
    unknown: BTreeSet<SortedAwaitedAction>,
231
    #[metric(group = "cache_check")]
232
    cache_check: BTreeSet<SortedAwaitedAction>,
233
    #[metric(group = "queued")]
234
    queued: BTreeSet<SortedAwaitedAction>,
235
    #[metric(group = "executing")]
236
    executing: BTreeSet<SortedAwaitedAction>,
237
    #[metric(group = "completed")]
238
    completed: BTreeSet<SortedAwaitedAction>,
239
}
240
241
impl SortedAwaitedActions {
242
39
    const fn btree_for_state(&mut self, state: &ActionStage) -> &mut BTreeSet<SortedAwaitedAction> {
243
39
        match state {
244
0
            ActionStage::Unknown => &mut self.unknown,
245
0
            ActionStage::CacheCheck => &mut self.cache_check,
246
26
            ActionStage::Queued => &mut self.queued,
247
13
            ActionStage::Executing => &mut self.executing,
248
0
            ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => &mut self.completed,
249
        }
250
39
    }
251
252
63
    fn insert_sort_map_for_stage(
253
63
        &mut self,
254
63
        stage: &ActionStage,
255
63
        sorted_awaited_action: &SortedAwaitedAction,
256
63
    ) -> Result<(), Error> {
257
63
        let newly_inserted = match stage {
258
0
            ActionStage::Unknown => self.unknown.insert(sorted_awaited_action.clone()),
259
0
            ActionStage::CacheCheck => self.cache_check.insert(sorted_awaited_action.clone()),
260
31
            ActionStage::Queued => self.queued.insert(sorted_awaited_action.clone()),
261
26
            ActionStage::Executing => self.executing.insert(sorted_awaited_action.clone()),
262
6
            ActionStage::Completed(_) => self.completed.insert(sorted_awaited_action.clone()),
263
            ActionStage::CompletedFromCache(_) => {
264
0
                self.completed.insert(sorted_awaited_action.clone())
265
            }
266
        };
267
63
        if !newly_inserted {
  Branch (267:12): [True: 0, False: 63]
  Branch (267:12): [Folded - Ignored]
268
0
            return Err(make_err!(
269
0
                Code::Internal,
270
0
                "Tried to insert an action that was already in the sorted map. This should never happen. {:?} - {:?}",
271
0
                stage,
272
0
                sorted_awaited_action
273
0
            ));
274
63
        }
275
63
        Ok(())
276
63
    }
277
278
39
    fn process_state_changes(
279
39
        &mut self,
280
39
        old_awaited_action: &AwaitedAction,
281
39
        new_awaited_action: &AwaitedAction,
282
39
    ) -> Result<(), Error> {
283
39
        let btree = self.btree_for_state(&old_awaited_action.state().stage);
284
39
        let maybe_sorted_awaited_action = btree.take(&SortedAwaitedAction {
285
39
            sort_key: old_awaited_action.sort_key(),
286
39
            operation_id: new_awaited_action.operation_id().clone(),
287
39
        });
288
289
39
        let Some(sorted_awaited_action) = maybe_sorted_awaited_action else {
  Branch (289:13): [True: 39, False: 0]
  Branch (289:13): [Folded - Ignored]
290
0
            return Err(make_err!(
291
0
                Code::Internal,
292
0
                "sorted_action_info_hash_keys and action_info_hash_key_to_awaited_action are out of sync - {} - {:?}",
293
0
                new_awaited_action.operation_id(),
294
0
                new_awaited_action,
295
0
            ));
296
        };
297
298
39
        self.insert_sort_map_for_stage(&new_awaited_action.state().stage, &sorted_awaited_action)
299
39
            .err_tip(|| "In AwaitedActionDb::update_awaited_action")
?0
;
300
39
        Ok(())
301
39
    }
302
}
303
304
/// The database for storing the state of all actions.
305
#[derive(Debug, MetricsComponent)]
306
pub struct AwaitedActionDbImpl<I: InstantWrapper, NowFn: Fn() -> I> {
307
    /// A lookup table to lookup the state of an action by its client operation id.
308
    #[metric(group = "client_operation_ids")]
309
    client_operation_to_awaited_action:
310
        EvictingMap<OperationId, OperationId, Arc<ClientAwaitedAction>, I>,
311
312
    /// A lookup table to lookup the state of an action by its worker operation id.
313
    #[metric(group = "operation_ids")]
314
    operation_id_to_awaited_action: BTreeMap<OperationId, watch::Sender<AwaitedAction>>,
315
316
    /// A lookup table to lookup the state of an action by its unique qualifier.
317
    #[metric(group = "action_info_hash_key_to_awaited_action")]
318
    action_info_hash_key_to_awaited_action: HashMap<ActionUniqueKey, OperationId>,
319
320
    /// A sorted set of [`AwaitedAction`]s. A wrapper is used to perform sorting
321
    /// based on the [`AwaitedActionSortKey`] of the [`AwaitedAction`].
322
    ///
323
    /// See [`AwaitedActionSortKey`] for more information on the ordering.
324
    #[metric(group = "sorted_action_infos")]
325
    sorted_action_info_hash_keys: SortedAwaitedActions,
326
327
    /// The number of connected clients for each operation id.
328
    #[metric(group = "connected_clients_for_operation_id")]
329
    connected_clients_for_operation_id: HashMap<OperationId, usize>,
330
331
    /// Where to send notifications about important events related to actions.
332
    action_event_tx: mpsc::UnboundedSender<ActionEvent>,
333
334
    /// The function to get the current time.
335
    now_fn: NowFn,
336
}
337
338
impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbImpl<I, NowFn> {
339
3
    async fn get_awaited_action_by_id(
340
3
        &self,
341
3
        client_operation_id: &OperationId,
342
3
    ) -> Result<Option<MemoryAwaitedActionSubscriber<I, NowFn>>, Error> {
343
3
        let maybe_client_awaited_action = self
344
3
            .client_operation_to_awaited_action
345
3
            .get(client_operation_id)
346
3
            .await;
347
3
        let Some(client_awaited_action) = maybe_client_awaited_action else {
  Branch (347:13): [True: 0, False: 0]
  Branch (347:13): [Folded - Ignored]
  Branch (347:13): [True: 3, False: 0]
348
0
            return Ok(None);
349
        };
350
351
3
        self.operation_id_to_awaited_action
352
3
            .get(client_awaited_action.operation_id())
353
3
            .map(|tx| {
354
3
                Some(MemoryAwaitedActionSubscriber::new_with_client(
355
3
                    tx.subscribe(),
356
3
                    client_operation_id.clone(),
357
3
                    self.action_event_tx.clone(),
358
3
                    self.now_fn.clone(),
359
3
                ))
360
3
            })
361
3
            .ok_or_else(|| 
{0
362
0
                make_err!(
363
0
                    Code::Internal,
364
                    "Failed to get client operation id {client_operation_id:?}"
365
                )
366
0
            })
367
3
    }
368
369
    /// Processes action events that need to be handled by the database.
370
55
    async fn handle_action_events(
371
55
        &mut self,
372
55
        action_events: impl IntoIterator<Item = ActionEvent>,
373
55
    ) -> NoEarlyReturn {
374
110
        for 
action55
in action_events {
375
55
            debug!(?action, "Handling action");
376
55
            match action {
377
1
                ActionEvent::ClientDroppedOperation(operation_id) => {
378
                    // Cleanup operation_id_to_awaited_action.
379
1
                    let Some(tx) = self.operation_id_to_awaited_action.remove(&operation_id) else {
  Branch (379:25): [True: 0, False: 0]
  Branch (379:25): [Folded - Ignored]
  Branch (379:25): [True: 0, False: 0]
  Branch (379:25): [True: 1, False: 0]
380
0
                        error!(
381
                            %operation_id,
382
0
                            "operation_id_to_awaited_action does not have operation_id"
383
                        );
384
0
                        continue;
385
                    };
386
387
1
                    let connected_clients = match self
388
1
                        .connected_clients_for_operation_id
389
1
                        .entry(operation_id.clone())
390
                    {
391
1
                        Entry::Occupied(entry) => {
392
1
                            let value = *entry.get();
393
1
                            entry.remove();
394
1
                            value - 1
395
                        }
396
                        Entry::Vacant(_) => {
397
0
                            error!(
398
                                %operation_id,
399
0
                                "connected_clients_for_operation_id does not have operation_id"
400
                            );
401
0
                            0
402
                        }
403
                    };
404
405
                    // Note: It is rare to have more than one client listening
406
                    // to the same action, so we assume that we are the last
407
                    // client and insert it back into the map if we detect that
408
                    // there are still clients listening (ie: the happy path
409
                    // is operation.connected_clients == 0).
410
1
                    if connected_clients != 0 {
  Branch (410:24): [True: 0, False: 0]
  Branch (410:24): [Folded - Ignored]
  Branch (410:24): [True: 0, False: 0]
  Branch (410:24): [True: 1, False: 0]
411
1
                        self.operation_id_to_awaited_action
412
1
                            .insert(operation_id.clone(), tx);
413
1
                        self.connected_clients_for_operation_id
414
1
                            .insert(operation_id, connected_clients);
415
1
                        continue;
416
0
                    }
417
0
                    debug!(%operation_id, "Clearing operation from state manager");
418
0
                    let awaited_action = tx.borrow().clone();
419
                    // Cleanup action_info_hash_key_to_awaited_action if it was marked cached.
420
0
                    match &awaited_action.action_info().unique_qualifier {
421
0
                        ActionUniqueQualifier::Cacheable(action_key) => {
422
0
                            let maybe_awaited_action = self
423
0
                                .action_info_hash_key_to_awaited_action
424
0
                                .remove(action_key);
425
0
                            if !awaited_action.state().stage.is_finished()
  Branch (425:32): [True: 0, False: 0]
  Branch (425:32): [Folded - Ignored]
  Branch (425:32): [True: 0, False: 0]
  Branch (425:32): [True: 0, False: 0]
426
0
                                && maybe_awaited_action.is_none()
  Branch (426:36): [True: 0, False: 0]
  Branch (426:36): [Folded - Ignored]
  Branch (426:36): [True: 0, False: 0]
  Branch (426:36): [True: 0, False: 0]
427
                            {
428
0
                                error!(
429
                                    %operation_id,
430
                                    ?awaited_action,
431
                                    ?action_key,
432
0
                                    "action_info_hash_key_to_awaited_action and operation_id_to_awaited_action are out of sync",
433
                                );
434
0
                            }
435
                        }
436
0
                        ActionUniqueQualifier::Uncacheable(_action_key) => {
437
0
                            // This Operation should not be in the hash_key map.
438
0
                        }
439
                    }
440
441
                    // Cleanup sorted_awaited_action.
442
0
                    let sort_key = awaited_action.sort_key();
443
0
                    let sort_btree_for_state = self
444
0
                        .sorted_action_info_hash_keys
445
0
                        .btree_for_state(&awaited_action.state().stage);
446
447
0
                    let maybe_sorted_awaited_action =
448
0
                        sort_btree_for_state.take(&SortedAwaitedAction {
449
0
                            sort_key,
450
0
                            operation_id: operation_id.clone(),
451
0
                        });
452
0
                    if maybe_sorted_awaited_action.is_none() {
  Branch (452:24): [True: 0, False: 0]
  Branch (452:24): [Folded - Ignored]
  Branch (452:24): [True: 0, False: 0]
  Branch (452:24): [True: 0, False: 0]
453
0
                        error!(
454
                            %operation_id,
455
                            ?sort_key,
456
0
                            "Expected maybe_sorted_awaited_action to have {sort_key:?}",
457
                        );
458
0
                    }
459
                }
460
54
                ActionEvent::ClientKeepAlive(client_id) => {
461
54
                    if let Some(client_awaited_action) = self
  Branch (461:28): [True: 0, False: 0]
  Branch (461:28): [Folded - Ignored]
  Branch (461:28): [True: 0, False: 0]
  Branch (461:28): [True: 54, False: 0]
462
54
                        .client_operation_to_awaited_action
463
54
                        .get(&client_id)
464
54
                        .await
465
                    {
466
54
                        if let Some(awaited_action_sender) = self
  Branch (466:32): [True: 0, False: 0]
  Branch (466:32): [Folded - Ignored]
  Branch (466:32): [True: 0, False: 0]
  Branch (466:32): [True: 54, False: 0]
467
54
                            .operation_id_to_awaited_action
468
54
                            .get(&client_awaited_action.operation_id)
469
                        {
470
54
                            awaited_action_sender.send_if_modified(|awaited_action| {
471
54
                                awaited_action.update_client_keep_alive((self.now_fn)().now());
472
54
                                false
473
54
                            });
474
0
                        }
475
                    } else {
476
0
                        error!(
477
                            ?client_id,
478
0
                            "client_operation_to_awaited_action does not have client_id",
479
                        );
480
                    }
481
                }
482
            }
483
        }
484
55
        NoEarlyReturn
485
55
    }
486
487
1
    fn get_awaited_actions_range(
488
1
        &self,
489
1
        start: Bound<&OperationId>,
490
1
        end: Bound<&OperationId>,
491
1
    ) -> impl Iterator<Item = (&'_ OperationId, MemoryAwaitedActionSubscriber<I, NowFn>)>
492
1
    + use<'_, I, NowFn> {
493
1
        self.operation_id_to_awaited_action
494
1
            .range((start, end))
495
1
            .map(|(operation_id, tx)| 
{0
496
0
                (
497
0
                    operation_id,
498
0
                    MemoryAwaitedActionSubscriber::<I, NowFn>::new(tx.subscribe()),
499
0
                )
500
0
            })
501
1
    }
502
503
584
    fn get_by_operation_id(
504
584
        &self,
505
584
        operation_id: &OperationId,
506
584
    ) -> Option<MemoryAwaitedActionSubscriber<I, NowFn>> {
507
584
        self.operation_id_to_awaited_action
508
584
            .get(operation_id)
509
584
            .map(|tx| 
MemoryAwaitedActionSubscriber::<I, NowFn>::new583
(
tx583
.
subscribe583
()))
510
584
    }
511
512
1.13k
    fn get_range_of_actions(
513
1.13k
        &self,
514
1.13k
        state: SortedAwaitedActionState,
515
1.13k
        range: impl RangeBounds<SortedAwaitedAction>,
516
1.13k
    ) -> impl DoubleEndedIterator<
517
1.13k
        Item = Result<
518
1.13k
            (
519
1.13k
                &SortedAwaitedAction,
520
1.13k
                MemoryAwaitedActionSubscriber<I, NowFn>,
521
1.13k
            ),
522
1.13k
            Error,
523
1.13k
        >,
524
1.13k
    > {
525
1.13k
        let btree = match state {
526
0
            SortedAwaitedActionState::CacheCheck => &self.sorted_action_info_hash_keys.cache_check,
527
1.13k
            SortedAwaitedActionState::Queued => &self.sorted_action_info_hash_keys.queued,
528
0
            SortedAwaitedActionState::Executing => &self.sorted_action_info_hash_keys.executing,
529
0
            SortedAwaitedActionState::Completed => &self.sorted_action_info_hash_keys.completed,
530
        };
531
1.13k
        btree.range(range).map(|sorted_awaited_action| 
{543
532
543
            let operation_id = &sorted_awaited_action.operation_id;
533
543
            self.get_by_operation_id(operation_id)
534
543
                .ok_or_else(|| 
{0
535
0
                    make_err!(
536
0
                        Code::Internal,
537
                        "Failed to get operation id {}",
538
                        operation_id
539
                    )
540
0
                })
541
543
                .map(|subscriber| (sorted_awaited_action, subscriber))
542
543
        })
543
1.13k
    }
544
545
39
    fn process_state_changes_for_hash_key_map(
546
39
        action_info_hash_key_to_awaited_action: &mut HashMap<ActionUniqueKey, OperationId>,
547
39
        new_awaited_action: &AwaitedAction,
548
39
    ) {
549
        // Only process changes if the stage is not finished.
550
39
        if !new_awaited_action.state().stage.is_finished() {
  Branch (550:12): [True: 0, False: 0]
  Branch (550:12): [Folded - Ignored]
  Branch (550:12): [True: 0, False: 0]
  Branch (550:12): [True: 33, False: 6]
551
33
            return;
552
6
        }
553
6
        match &new_awaited_action.action_info().unique_qualifier {
554
6
            ActionUniqueQualifier::Cacheable(action_key) => {
555
6
                let maybe_awaited_action =
556
6
                    action_info_hash_key_to_awaited_action.remove(action_key);
557
6
                match maybe_awaited_action {
558
6
                    Some(removed_operation_id) => {
559
6
                        if &removed_operation_id != new_awaited_action.operation_id() {
  Branch (559:28): [True: 0, False: 0]
  Branch (559:28): [Folded - Ignored]
  Branch (559:28): [True: 0, False: 0]
  Branch (559:28): [True: 0, False: 6]
560
0
                            error!(
561
                                ?removed_operation_id,
562
                                ?new_awaited_action,
563
                                ?action_key,
564
0
                                "action_info_hash_key_to_awaited_action and operation_id_to_awaited_action are out of sync",
565
                            );
566
6
                        }
567
                    }
568
                    None => {
569
0
                        error!(
570
                            ?new_awaited_action,
571
                            ?action_key,
572
0
                            "action_info_hash_key_to_awaited_action out of sync, it should have had the unique_key",
573
                        );
574
                    }
575
                }
576
            }
577
0
            ActionUniqueQualifier::Uncacheable(_action_key) => {
578
0
                // If we are not cacheable, the action should not be in the
579
0
                // hash_key map, so we don't need to process anything in
580
0
                // action_info_hash_key_to_awaited_action.
581
0
            }
582
        }
583
39
    }
584
585
39
    fn update_awaited_action(
586
39
        &mut self,
587
39
        mut new_awaited_action: AwaitedAction,
588
39
    ) -> Result<(), Error> {
589
39
        let tx = self
590
39
            .operation_id_to_awaited_action
591
39
            .get(new_awaited_action.operation_id())
592
39
            .ok_or_else(|| 
{0
593
0
                make_err!(
594
0
                    Code::Internal,
595
                    "OperationId does not exist in map in AwaitedActionDb::update_awaited_action"
596
                )
597
0
            })?;
598
        {
599
            // Note: It's important to drop old_awaited_action before we call
600
            // send_replace or we will have a deadlock.
601
39
            let old_awaited_action = tx.borrow();
602
603
            // Do not process changes if the action version is not in sync with
604
            // what the sender based the update on.
605
39
            if old_awaited_action.version() != new_awaited_action.version() {
  Branch (605:16): [True: 0, False: 0]
  Branch (605:16): [Folded - Ignored]
  Branch (605:16): [True: 0, False: 0]
  Branch (605:16): [True: 0, False: 39]
606
0
                return Err(make_err!(
607
0
                    // From: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
608
0
                    // Use ABORTED if the client should retry at a higher level
609
0
                    // (e.g., when a client-specified test-and-set fails,
610
0
                    // indicating the client should restart a read-modify-write
611
0
                    // sequence)
612
0
                    Code::Aborted,
613
0
                    "{} Expected {} but got {} for operation_id {:?} - {:?}",
614
0
                    "Tried to update an awaited action with an incorrect version.",
615
0
                    old_awaited_action.version(),
616
0
                    new_awaited_action.version(),
617
0
                    old_awaited_action,
618
0
                    new_awaited_action,
619
0
                ));
620
39
            }
621
39
            new_awaited_action.increment_version();
622
623
0
            error_if!(
624
39
                old_awaited_action.action_info().unique_qualifier
  Branch (624:17): [True: 0, False: 0]
  Branch (624:17): [Folded - Ignored]
  Branch (624:17): [True: 0, False: 0]
  Branch (624:17): [True: 0, False: 39]
625
39
                    != new_awaited_action.action_info().unique_qualifier,
626
                "Unique key changed for operation_id {:?} - {:?} - {:?}",
627
0
                new_awaited_action.operation_id(),
628
0
                old_awaited_action.action_info(),
629
0
                new_awaited_action.action_info(),
630
            );
631
39
            let is_same_stage = old_awaited_action
632
39
                .state()
633
39
                .stage
634
39
                .is_same_stage(&new_awaited_action.state().stage);
635
636
39
            if !is_same_stage {
  Branch (636:16): [True: 0, False: 0]
  Branch (636:16): [Folded - Ignored]
  Branch (636:16): [True: 0, False: 0]
  Branch (636:16): [True: 39, False: 0]
637
                // Record metrics for stage transitions
638
39
                let metrics = &*EXECUTION_METRICS;
639
39
                let old_stage = &old_awaited_action.state().stage;
640
39
                let new_stage = &new_awaited_action.state().stage;
641
642
                // Track stage transitions
643
39
                let base_attrs = make_execution_attributes(
644
39
                    "unknown",
645
39
                    None,
646
39
                    Some(old_awaited_action.action_info().priority),
647
                );
648
39
                metrics.execution_stage_transitions.add(1, &base_attrs);
649
650
                // Update active count for old stage
651
39
                let old_stage_attrs = vec![opentelemetry::KeyValue::new(
652
                    nativelink_util::metrics::EXECUTION_STAGE,
653
39
                    ExecutionStage::from(old_stage),
654
                )];
655
39
                metrics.execution_active_count.add(-1, &old_stage_attrs);
656
657
                // Update active count for new stage
658
39
                let new_stage_attrs = vec![opentelemetry::KeyValue::new(
659
                    nativelink_util::metrics::EXECUTION_STAGE,
660
39
                    ExecutionStage::from(new_stage),
661
                )];
662
39
                metrics.execution_active_count.add(1, &new_stage_attrs);
663
664
                // Record completion metrics with action digest for failure tracking
665
39
                let action_digest = old_awaited_action.action_info().digest().to_string();
666
39
                if let ActionStage::Completed(
action_result6
) = new_stage {
  Branch (666:24): [True: 0, False: 0]
  Branch (666:24): [Folded - Ignored]
  Branch (666:24): [True: 0, False: 0]
  Branch (666:24): [True: 6, False: 33]
667
6
                    let result_attrs = vec![
668
6
                        opentelemetry::KeyValue::new(
669
                            nativelink_util::metrics::EXECUTION_RESULT,
670
6
                            if action_result.exit_code == 0 {
  Branch (670:32): [True: 0, False: 0]
  Branch (670:32): [Folded - Ignored]
  Branch (670:32): [True: 0, False: 0]
  Branch (670:32): [True: 5, False: 1]
671
5
                                ExecutionResult::Success
672
                            } else {
673
1
                                ExecutionResult::Failure
674
                            },
675
                        ),
676
6
                        opentelemetry::KeyValue::new(
677
                            nativelink_util::metrics::EXECUTION_ACTION_DIGEST,
678
6
                            action_digest,
679
                        ),
680
                    ];
681
6
                    metrics.execution_completed_count.add(1, &result_attrs);
682
33
                } else if let ActionStage::CompletedFromCache(_) = new_stage {
  Branch (682:31): [True: 0, False: 0]
  Branch (682:31): [Folded - Ignored]
  Branch (682:31): [True: 0, False: 0]
  Branch (682:31): [True: 0, False: 33]
683
0
                    let result_attrs = vec![
684
0
                        opentelemetry::KeyValue::new(
685
0
                            nativelink_util::metrics::EXECUTION_RESULT,
686
0
                            ExecutionResult::CacheHit,
687
0
                        ),
688
0
                        opentelemetry::KeyValue::new(
689
0
                            nativelink_util::metrics::EXECUTION_ACTION_DIGEST,
690
0
                            action_digest,
691
0
                        ),
692
0
                    ];
693
0
                    metrics.execution_completed_count.add(1, &result_attrs);
694
33
                }
695
696
39
                self.sorted_action_info_hash_keys
697
39
                    .process_state_changes(&old_awaited_action, &new_awaited_action)
?0
;
698
39
                Self::process_state_changes_for_hash_key_map(
699
39
                    &mut self.action_info_hash_key_to_awaited_action,
700
39
                    &new_awaited_action,
701
                );
702
0
            }
703
        }
704
705
        // Notify all listeners of the new state and ignore if no one is listening.
706
        // Note: Do not use `.send()` as it will not update the state if all listeners
707
        // are dropped.
708
39
        drop(tx.send_replace(new_awaited_action));
709
710
39
        Ok(())
711
39
    }
712
713
    /// Creates a new [`ClientAwaitedAction`] and a [`watch::Receiver`] to
714
    /// listen for changes. We don't do this in-line because it is important
715
    /// to ALWAYS construct a [`ClientAwaitedAction`] before inserting it into
716
    /// the map. Failing to do so may result in memory leaks. This is because
717
    /// [`ClientAwaitedAction`] implements a drop function that will trigger
718
    /// cleanup of the other maps on drop.
719
24
    fn make_client_awaited_action(
720
24
        &mut self,
721
24
        operation_id: &OperationId,
722
24
        awaited_action: AwaitedAction,
723
24
    ) -> (Arc<ClientAwaitedAction>, watch::Receiver<AwaitedAction>) {
724
24
        let (tx, rx) = watch::channel(awaited_action);
725
24
        let client_awaited_action = Arc::new(ClientAwaitedAction::new(
726
24
            operation_id.clone(),
727
24
            self.action_event_tx.clone(),
728
        ));
729
24
        self.operation_id_to_awaited_action
730
24
            .insert(operation_id.clone(), tx);
731
24
        self.connected_clients_for_operation_id
732
24
            .insert(operation_id.clone(), 1);
733
24
        (client_awaited_action, rx)
734
24
    }
735
736
27
    async fn add_action(
737
27
        &mut self,
738
27
        client_operation_id: OperationId,
739
27
        action_info: Arc<ActionInfo>,
740
27
    ) -> Result<MemoryAwaitedActionSubscriber<I, NowFn>, Error> {
741
        // Check to see if the action is already known and subscribe if it is.
742
27
        let subscription_result = self
743
27
            .try_subscribe(
744
27
                &client_operation_id,
745
27
                &action_info.unique_qualifier,
746
27
                action_info.priority,
747
27
            )
748
27
            .await
749
27
            .err_tip(|| "In AwaitedActionDb::subscribe_or_add_action");
750
27
        match subscription_result {
751
0
            Err(err) => return Err(err),
752
3
            Ok(Some(subscription)) => return Ok(subscription),
753
24
            Ok(None) => { /* Add item to queue. */ }
754
        }
755
756
24
        let maybe_unique_key = match &action_info.unique_qualifier {
757
24
            ActionUniqueQualifier::Cacheable(unique_key) => Some(unique_key.clone()),
758
0
            ActionUniqueQualifier::Uncacheable(_unique_key) => None,
759
        };
760
24
        let operation_id = OperationId::default();
761
24
        let awaited_action = AwaitedAction::new(
762
24
            operation_id.clone(),
763
24
            action_info.clone(),
764
24
            (self.now_fn)().now(),
765
        );
766
24
        debug_assert!(
767
0
            ActionStage::Queued == awaited_action.state().stage,
768
0
            "Expected action to be queued"
769
        );
770
24
        let sort_key = awaited_action.sort_key();
771
772
24
        let (client_awaited_action, rx) =
773
24
            self.make_client_awaited_action(&operation_id.clone(), awaited_action);
774
775
24
        debug!(
776
            %client_operation_id,
777
            %operation_id,
778
            ?client_awaited_action,
779
24
            "Adding action"
780
        );
781
782
24
        self.client_operation_to_awaited_action
783
24
            .insert(client_operation_id.clone(), client_awaited_action)
784
24
            .await;
785
786
        // Note: We only put items in the map that are cacheable.
787
24
        if let Some(unique_key) = maybe_unique_key {
  Branch (787:16): [True: 0, False: 0]
  Branch (787:16): [Folded - Ignored]
  Branch (787:16): [True: 24, False: 0]
788
24
            let old_value = self
789
24
                .action_info_hash_key_to_awaited_action
790
24
                .insert(unique_key, operation_id.clone());
791
24
            if let Some(
old_value0
) = old_value {
  Branch (791:20): [True: 0, False: 0]
  Branch (791:20): [Folded - Ignored]
  Branch (791:20): [True: 0, False: 24]
792
0
                error!(
793
                    %operation_id,
794
                    ?old_value,
795
0
                    "action_info_hash_key_to_awaited_action already has unique_key"
796
                );
797
24
            }
798
0
        }
799
800
        // Record metric for new action entering the queue
801
24
        let metrics = &*EXECUTION_METRICS;
802
24
        let _base_attrs = make_execution_attributes("unknown", None, Some(action_info.priority));
803
24
        let queued_attrs = vec![opentelemetry::KeyValue::new(
804
            nativelink_util::metrics::EXECUTION_STAGE,
805
24
            ExecutionStage::Queued,
806
        )];
807
24
        metrics.execution_active_count.add(1, &queued_attrs);
808
809
24
        self.sorted_action_info_hash_keys
810
24
            .insert_sort_map_for_stage(
811
24
                &ActionStage::Queued,
812
24
                &SortedAwaitedAction {
813
24
                    sort_key,
814
24
                    operation_id,
815
24
                },
816
            )
817
24
            .err_tip(|| "In AwaitedActionDb::subscribe_or_add_action")
?0
;
818
819
24
        Ok(MemoryAwaitedActionSubscriber::new_with_client(
820
24
            rx,
821
24
            client_operation_id,
822
24
            self.action_event_tx.clone(),
823
24
            self.now_fn.clone(),
824
24
        ))
825
27
    }
826
827
27
    async fn try_subscribe(
828
27
        &mut self,
829
27
        client_operation_id: &OperationId,
830
27
        unique_qualifier: &ActionUniqueQualifier,
831
27
        // TODO(palfrey) To simplify the scheduler 2024 refactor, we
832
27
        // removed the ability to upgrade priorities of actions.
833
27
        // we should add priority upgrades back in.
834
27
        _priority: i32,
835
27
    ) -> Result<Option<MemoryAwaitedActionSubscriber<I, NowFn>>, Error> {
836
27
        let unique_key = match unique_qualifier {
837
27
            ActionUniqueQualifier::Cacheable(unique_key) => unique_key,
838
0
            ActionUniqueQualifier::Uncacheable(_unique_key) => return Ok(None),
839
        };
840
841
27
        let Some(
operation_id3
) = self.action_info_hash_key_to_awaited_action.get(unique_key) else {
  Branch (841:13): [True: 0, False: 0]
  Branch (841:13): [Folded - Ignored]
  Branch (841:13): [True: 3, False: 24]
842
24
            return Ok(None); // Not currently running.
843
        };
844
845
3
        let Some(tx) = self.operation_id_to_awaited_action.get(operation_id) else {
  Branch (845:13): [True: 0, False: 0]
  Branch (845:13): [Folded - Ignored]
  Branch (845:13): [True: 3, False: 0]
846
0
            return Err(make_err!(
847
0
                Code::Internal,
848
0
                "operation_id_to_awaited_action and action_info_hash_key_to_awaited_action are out of sync for {unique_key:?} - {operation_id}"
849
0
            ));
850
        };
851
852
0
        error_if!(
853
3
            tx.borrow().state().stage.is_finished(),
  Branch (853:13): [True: 0, False: 0]
  Branch (853:13): [Folded - Ignored]
  Branch (853:13): [True: 0, False: 3]
854
            "Tried to subscribe to a completed action but it already finished. This should never happen. {:?}",
855
0
            tx.borrow()
856
        );
857
858
3
        let maybe_connected_clients = self
859
3
            .connected_clients_for_operation_id
860
3
            .get_mut(operation_id);
861
3
        let Some(connected_clients) = maybe_connected_clients else {
  Branch (861:13): [True: 0, False: 0]
  Branch (861:13): [Folded - Ignored]
  Branch (861:13): [True: 3, False: 0]
862
0
            return Err(make_err!(
863
0
                Code::Internal,
864
0
                "connected_clients_for_operation_id and operation_id_to_awaited_action are out of sync for {unique_key:?} - {operation_id}"
865
0
            ));
866
        };
867
3
        *connected_clients += 1;
868
869
        // Immediately mark the keep alive, we don't need to wake anyone
870
        // so we always fake that it was not actually changed.
871
        // Failing update the client could lead to the client connecting
872
        // then not updating the keep alive in time, resulting in the
873
        // operation timing out due to async behavior.
874
3
        tx.send_if_modified(|awaited_action| {
875
3
            awaited_action.update_client_keep_alive((self.now_fn)().now());
876
3
            false
877
3
        });
878
3
        let subscription = tx.subscribe();
879
880
3
        self.client_operation_to_awaited_action
881
3
            .insert(
882
3
                client_operation_id.clone(),
883
3
                Arc::new(ClientAwaitedAction::new(
884
3
                    operation_id.clone(),
885
3
                    self.action_event_tx.clone(),
886
3
                )),
887
3
            )
888
3
            .await;
889
890
3
        Ok(Some(MemoryAwaitedActionSubscriber::new_with_client(
891
3
            subscription,
892
3
            client_operation_id.clone(),
893
3
            self.action_event_tx.clone(),
894
3
            self.now_fn.clone(),
895
3
        )))
896
27
    }
897
}
898
899
#[derive(Debug, MetricsComponent)]
900
pub struct MemoryAwaitedActionDb<I: InstantWrapper, NowFn: Fn() -> I> {
901
    #[metric]
902
    inner: Arc<Mutex<AwaitedActionDbImpl<I, NowFn>>>,
903
    tasks_change_notify: Arc<Notify>,
904
    _handle_awaited_action_events: JoinHandleDropGuard<()>,
905
}
906
907
impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static>
908
    MemoryAwaitedActionDb<I, NowFn>
909
{
910
22
    pub fn new(
911
22
        eviction_config: &EvictionPolicy,
912
22
        tasks_change_notify: Arc<Notify>,
913
22
        now_fn: NowFn,
914
22
    ) -> Self {
915
22
        let (action_event_tx, mut action_event_rx) = mpsc::unbounded_channel();
916
22
        let inner = Arc::new(Mutex::new(AwaitedActionDbImpl {
917
22
            client_operation_to_awaited_action: EvictingMap::new(eviction_config, (now_fn)()),
918
22
            operation_id_to_awaited_action: BTreeMap::new(),
919
22
            action_info_hash_key_to_awaited_action: HashMap::new(),
920
22
            sorted_action_info_hash_keys: SortedAwaitedActions::default(),
921
22
            connected_clients_for_operation_id: HashMap::new(),
922
22
            action_event_tx,
923
22
            now_fn,
924
22
        }));
925
22
        let weak_inner = Arc::downgrade(&inner);
926
        Self {
927
22
            inner,
928
22
            tasks_change_notify,
929
22
            _handle_awaited_action_events: spawn!("handle_awaited_action_events", async move 
{20
930
20
                let mut dropped_operation_ids = Vec::with_capacity(MAX_ACTION_EVENTS_RX_PER_CYCLE);
931
                loop {
932
75
                    dropped_operation_ids.clear();
933
75
                    action_event_rx
934
75
                        .recv_many(&mut dropped_operation_ids, MAX_ACTION_EVENTS_RX_PER_CYCLE)
935
75
                        .await;
936
55
                    let Some(inner) = weak_inner.upgrade() else {
  Branch (936:25): [True: 0, False: 0]
  Branch (936:25): [Folded - Ignored]
  Branch (936:25): [True: 0, False: 0]
  Branch (936:25): [True: 55, False: 0]
937
0
                        return; // Nothing to cleanup, our struct is dropped.
938
                    };
939
55
                    let mut inner = inner.lock().await;
940
55
                    inner
941
55
                        .handle_action_events(dropped_operation_ids.drain(..))
942
55
                        .await;
943
                }
944
0
            }),
945
        }
946
22
    }
947
}
948
949
impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static> AwaitedActionDb
950
    for MemoryAwaitedActionDb<I, NowFn>
951
{
952
    type Subscriber = MemoryAwaitedActionSubscriber<I, NowFn>;
953
954
3
    async fn get_awaited_action_by_id(
955
3
        &self,
956
3
        client_operation_id: &OperationId,
957
3
    ) -> Result<Option<Self::Subscriber>, Error> {
958
3
        self.inner
959
3
            .lock()
960
3
            .await
961
3
            .get_awaited_action_by_id(client_operation_id)
962
3
            .await
963
3
    }
964
965
1
    async fn get_all_awaited_actions(
966
1
        &self,
967
1
    ) -> Result<impl Stream<Item = Result<Self::Subscriber, Error>>, Error> {
968
1
        Ok(ChunkedStream::new(
969
1
            Bound::Unbounded,
970
1
            Bound::Unbounded,
971
1
            move |start, end, mut output| async move {
972
1
                let inner = self.inner.lock().await;
973
1
                let mut maybe_new_start = None;
974
975
0
                for (operation_id, item) in
976
1
                    inner.get_awaited_actions_range(start.as_ref(), end.as_ref())
977
0
                {
978
0
                    output.push_back(item);
979
0
                    maybe_new_start = Some(operation_id);
980
0
                }
981
982
1
                Ok(maybe_new_start
983
1
                    .map(|new_start| (
(Bound::Excluded(new_start.clone()), end)0
,
output0
)))
984
2
            },
985
        ))
986
1
    }
987
988
41
    async fn get_by_operation_id(
989
41
        &self,
990
41
        operation_id: &OperationId,
991
41
    ) -> Result<Option<Self::Subscriber>, Error> {
992
41
        Ok(self.inner.lock().await.get_by_operation_id(operation_id))
993
41
    }
994
995
590
    async fn get_range_of_actions(
996
590
        &self,
997
590
        state: SortedAwaitedActionState,
998
590
        start: Bound<SortedAwaitedAction>,
999
590
        end: Bound<SortedAwaitedAction>,
1000
590
        desc: bool,
1001
590
    ) -> Result<impl Stream<Item = Result<Self::Subscriber, Error>> + Send, Error> {
1002
590
        Ok(ChunkedStream::new(
1003
590
            start,
1004
590
            end,
1005
1.13k
            move |start, end, mut output| async move {
1006
1.13k
                let inner = self.inner.lock().await;
1007
1.13k
                let mut done = true;
1008
1.13k
                let mut new_start = start.as_ref();
1009
1.13k
                let mut new_end = end.as_ref();
1010
1011
1.13k
                let iterator = inner
1012
1.13k
                    .get_range_of_actions(state, (start.as_ref(), end.as_ref()))
1013
1.13k
                    .map(|res| 
res543
.
err_tip543
(|| "In AwaitedActionDb::get_range_of_actions"));
1014
1015
                // TODO(palfrey) This should probably use the `.left()/right()` pattern,
1016
                // but that doesn't exist in the std or any libraries we use.
1017
1.13k
                if desc {
  Branch (1017:20): [True: 0, False: 0]
  Branch (1017:20): [Folded - Ignored]
  Branch (1017:20): [True: 130, False: 1.00k]
1018
130
                    for 
result43
in iterator.rev() {
1019
43
                        let (sorted_awaited_action, item) =
1020
43
                            result.err_tip(|| "In AwaitedActionDb::get_range_of_actions")
?0
;
1021
43
                        output.push_back(item);
1022
43
                        new_end = Bound::Excluded(sorted_awaited_action);
1023
43
                        done = false;
1024
                    }
1025
                } else {
1026
1.50k
                    for 
result500
in iterator {
1027
500
                        let (sorted_awaited_action, item) =
1028
500
                            result.err_tip(|| "In AwaitedActionDb::get_range_of_actions")
?0
;
1029
500
                        output.push_back(item);
1030
500
                        new_start = Bound::Excluded(sorted_awaited_action);
1031
500
                        done = false;
1032
                    }
1033
                }
1034
1.13k
                if done {
  Branch (1034:20): [True: 0, False: 0]
  Branch (1034:20): [Folded - Ignored]
  Branch (1034:20): [True: 590, False: 540]
1035
590
                    return Ok(None);
1036
540
                }
1037
540
                Ok(Some(((new_start.cloned(), new_end.cloned()), output)))
1038
2.26k
            },
1039
        ))
1040
590
    }
1041
1042
39
    async fn update_awaited_action(&self, new_awaited_action: AwaitedAction) -> Result<(), Error> {
1043
39
        self.inner
1044
39
            .lock()
1045
39
            .await
1046
39
            .update_awaited_action(new_awaited_action)
?0
;
1047
39
        self.tasks_change_notify.notify_one();
1048
39
        Ok(())
1049
39
    }
1050
1051
27
    async fn add_action(
1052
27
        &self,
1053
27
        client_operation_id: OperationId,
1054
27
        action_info: Arc<ActionInfo>,
1055
27
        _no_event_action_timeout: Duration,
1056
27
    ) -> Result<Self::Subscriber, Error> {
1057
27
        let subscriber = self
1058
27
            .inner
1059
27
            .lock()
1060
27
            .await
1061
27
            .add_action(client_operation_id, action_info)
1062
27
            .await
?0
;
1063
27
        self.tasks_change_notify.notify_one();
1064
27
        Ok(subscriber)
1065
27
    }
1066
}