/build/source/nativelink-scheduler/src/memory_awaited_action_db.rs
Line  | Count  | Source  | 
1  |  | // Copyright 2024 The NativeLink Authors. All rights reserved.  | 
2  |  | //  | 
3  |  | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");  | 
4  |  | // you may not use this file except in compliance with the License.  | 
5  |  | // You may obtain a copy of the License at  | 
6  |  | //  | 
7  |  | //    See LICENSE file for details  | 
8  |  | //  | 
9  |  | // Unless required by applicable law or agreed to in writing, software  | 
10  |  | // distributed under the License is distributed on an "AS IS" BASIS,  | 
11  |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
12  |  | // See the License for the specific language governing permissions and  | 
13  |  | // limitations under the License.  | 
14  |  |  | 
15  |  | use core::ops::{Bound, RangeBounds}; | 
16  |  | use core::time::Duration;  | 
17  |  | use std::collections::hash_map::Entry;  | 
18  |  | use std::collections::{BTreeMap, BTreeSet, HashMap}; | 
19  |  | use std::sync::Arc;  | 
20  |  |  | 
21  |  | use async_lock::Mutex;  | 
22  |  | use futures::{FutureExt, Stream}; | 
23  |  | use nativelink_config::stores::EvictionPolicy;  | 
24  |  | use nativelink_error::{Code, Error, ResultExt, error_if, make_err}; | 
25  |  | use nativelink_metric::MetricsComponent;  | 
26  |  | use nativelink_util::action_messages::{ | 
27  |  |     ActionInfo, ActionStage, ActionUniqueKey, ActionUniqueQualifier, OperationId,  | 
28  |  | };  | 
29  |  | use nativelink_util::chunked_stream::ChunkedStream;  | 
30  |  | use nativelink_util::evicting_map::{EvictingMap, LenEntry}; | 
31  |  | use nativelink_util::instant_wrapper::InstantWrapper;  | 
32  |  | use nativelink_util::spawn;  | 
33  |  | use nativelink_util::task::JoinHandleDropGuard;  | 
34  |  | use tokio::sync::{Notify, mpsc, watch}; | 
35  |  | use tracing::{debug, error}; | 
36  |  |  | 
37  |  | use crate::awaited_action_db::{ | 
38  |  |     AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, CLIENT_KEEPALIVE_DURATION,  | 
39  |  |     SortedAwaitedAction, SortedAwaitedActionState,  | 
40  |  | };  | 
41  |  |  | 
42  |  | /// Number of events to process per cycle.  | 
43  |  | const MAX_ACTION_EVENTS_RX_PER_CYCLE: usize = 1024;  | 
44  |  |  | 
45  |  | /// Represents a client that is currently listening to an action.  | 
46  |  | /// When the client is dropped, it will send the `AwaitedAction` to the  | 
47  |  | /// `event_tx` if there are other cleanups needed.  | 
48  |  | #[derive(Debug)]  | 
49  |  | struct ClientAwaitedAction { | 
50  |  |     /// The `OperationId` that the client is listening to.  | 
51  |  |     operation_id: OperationId,  | 
52  |  |  | 
53  |  |     /// The sender to notify of this struct being dropped.  | 
54  |  |     event_tx: mpsc::UnboundedSender<ActionEvent>,  | 
55  |  | }  | 
56  |  |  | 
57  |  | impl ClientAwaitedAction { | 
58  | 27  |     pub(crate) const fn new(  | 
59  | 27  |         operation_id: OperationId,  | 
60  | 27  |         event_tx: mpsc::UnboundedSender<ActionEvent>,  | 
61  | 27  |     ) -> Self { | 
62  | 27  |         Self { | 
63  | 27  |             operation_id,  | 
64  | 27  |             event_tx,  | 
65  | 27  |         }  | 
66  | 27  |     }  | 
67  |  |  | 
68  | 3  |     pub(crate) const fn operation_id(&self) -> &OperationId { | 
69  | 3  |         &self.operation_id  | 
70  | 3  |     }  | 
71  |  | }  | 
72  |  |  | 
73  |  | impl Drop for ClientAwaitedAction { | 
74  | 27  |     fn drop(&mut self) { | 
75  |  |         // If we failed to send it means noone is listening.  | 
76  | 27  |         drop(self.event_tx.send(ActionEvent::ClientDroppedOperation(  | 
77  | 27  |             self.operation_id.clone(),  | 
78  | 27  |         )));  | 
79  | 27  |     }  | 
80  |  | }  | 
81  |  |  | 
82  |  | /// Trait to be able to use the `EvictingMap` with `ClientAwaitedAction`.  | 
83  |  | /// Note: We only use `EvictingMap` for a time based eviction, which is  | 
84  |  | /// why the implementation has fixed default values in it.  | 
85  |  | impl LenEntry for ClientAwaitedAction { | 
86  |  |     #[inline]  | 
87  | 29  |     fn len(&self) -> u64 { | 
88  | 29  |         0  | 
89  | 29  |     }  | 
90  |  |  | 
91  |  |     #[inline]  | 
92  | 0  |     fn is_empty(&self) -> bool { | 
93  | 0  |         true  | 
94  | 0  |     }  | 
95  |  | }  | 
96  |  |  | 
97  |  | /// Actions the `AwaitedActionsDb` needs to process.  | 
98  |  | #[derive(Debug)]  | 
99  |  | pub(crate) enum ActionEvent { | 
100  |  |     /// A client has sent a keep alive message.  | 
101  |  |     ClientKeepAlive(OperationId),  | 
102  |  |     /// A client has dropped and pointed to `OperationId`.  | 
103  |  |     ClientDroppedOperation(OperationId),  | 
104  |  | }  | 
105  |  |  | 
106  |  | /// Information required to track an individual client  | 
107  |  | /// keep alive config and state.  | 
108  |  | #[derive(Debug)]  | 
109  |  | struct ClientInfo<I: InstantWrapper, NowFn: Fn() -> I> { | 
110  |  |     /// The client operation id.  | 
111  |  |     client_operation_id: OperationId,  | 
112  |  |     /// The last time a keep alive was sent.  | 
113  |  |     last_keep_alive: I,  | 
114  |  |     /// The function to get the current time.  | 
115  |  |     now_fn: NowFn,  | 
116  |  |     /// The sender to notify of this struct had an event.  | 
117  |  |     event_tx: mpsc::UnboundedSender<ActionEvent>,  | 
118  |  | }  | 
119  |  |  | 
120  |  | /// Subscriber that clients can be used to monitor when `AwaitedActions` change.  | 
121  |  | #[derive(Debug)]  | 
122  |  | pub struct MemoryAwaitedActionSubscriber<I: InstantWrapper, NowFn: Fn() -> I> { | 
123  |  |     /// The receiver to listen for changes.  | 
124  |  |     awaited_action_rx: watch::Receiver<AwaitedAction>,  | 
125  |  |     /// If a client id is known this is the info needed to keep the client  | 
126  |  |     /// action alive.  | 
127  |  |     client_info: Option<ClientInfo<I, NowFn>>,  | 
128  |  | }  | 
129  |  |  | 
130  |  | impl<I: InstantWrapper, NowFn: Fn() -> I> MemoryAwaitedActionSubscriber<I, NowFn> { | 
131  | 583  |     fn new(mut awaited_action_rx: watch::Receiver<AwaitedAction>) -> Self { | 
132  | 583  |         awaited_action_rx.mark_changed();  | 
133  | 583  |         Self { | 
134  | 583  |             awaited_action_rx,  | 
135  | 583  |             client_info: None,  | 
136  | 583  |         }  | 
137  | 583  |     }  | 
138  |  |  | 
139  | 30  |     fn new_with_client(  | 
140  | 30  |         mut awaited_action_rx: watch::Receiver<AwaitedAction>,  | 
141  | 30  |         client_operation_id: OperationId,  | 
142  | 30  |         event_tx: mpsc::UnboundedSender<ActionEvent>,  | 
143  | 30  |         now_fn: NowFn,  | 
144  | 30  |     ) -> Self  | 
145  | 30  |     where  | 
146  | 30  |         NowFn: Fn() -> I,  | 
147  |  |     { | 
148  | 30  |         awaited_action_rx.mark_changed();  | 
149  | 30  |         Self { | 
150  | 30  |             awaited_action_rx,  | 
151  | 30  |             client_info: Some(ClientInfo { | 
152  | 30  |                 client_operation_id,  | 
153  | 30  |                 last_keep_alive: I::from_secs(0),  | 
154  | 30  |                 now_fn,  | 
155  | 30  |                 event_tx,  | 
156  | 30  |             }),  | 
157  | 30  |         }  | 
158  | 30  |     }  | 
159  |  | }  | 
160  |  |  | 
161  |  | impl<I, NowFn> AwaitedActionSubscriber for MemoryAwaitedActionSubscriber<I, NowFn>  | 
162  |  | where  | 
163  |  |     I: InstantWrapper,  | 
164  |  |     NowFn: Fn() -> I + Send + Sync + 'static,  | 
165  |  | { | 
166  | 58  |     async fn changed(&mut self) -> Result<AwaitedAction, Error> { | 
167  | 44  |         let client_operation_id = { | 
168  | 58  |             let changed_fut = self.awaited_action_rx.changed().map(|r| {44   | 
169  | 44  |                 r.map_err(|e| {0   | 
170  | 0  |                     make_err!(  | 
171  | 0  |                         Code::Internal,  | 
172  |  |                         "Failed to wait for awaited action to change {e:?}" | 
173  |  |                     )  | 
174  | 0  |                 })  | 
175  | 44  |             });  | 
176  | 58  |             let Some(client_info) = self.client_info.as_mut() else {  Branch (176:17): [True: 0, False: 0]
   Branch (176:17): [Folded - Ignored]
   Branch (176:17): [True: 58, False: 0]
  | 
177  | 0  |                 changed_fut.await?;  | 
178  | 0  |                 return Ok(self.awaited_action_rx.borrow().clone());  | 
179  |  |             };  | 
180  | 58  |             tokio::pin!(changed_fut);  | 
181  |  |             loop { | 
182  | 163  |                 if client_info.last_keep_alive.elapsed() > CLIENT_KEEPALIVE_DURATION {  Branch (182:20): [True: 0, False: 0]
   Branch (182:20): [Folded - Ignored]
   Branch (182:20): [True: 55, False: 108]
  | 
183  | 55  |                     client_info.last_keep_alive = (client_info.now_fn)();  | 
184  | 55  |                     // Failing to send just means our receiver dropped.  | 
185  | 55  |                     drop(client_info.event_tx.send(ActionEvent::ClientKeepAlive(  | 
186  | 55  |                         client_info.client_operation_id.clone(),  | 
187  | 55  |                     )));  | 
188  | 108  |                 }  | 
189  | 163  |                 let sleep_fut = (client_info.now_fn)().sleep(CLIENT_KEEPALIVE_DURATION);  | 
190  | 163  |                 tokio::select! { | 
191  | 163  |                     result44  = &mut changed_fut => {  | 
192  | 44  |                         result?0 ;  | 
193  | 44  |                         break;  | 
194  |  |                     }  | 
195  | 163  |                     () = sleep_fut => { | 
196  | 105  |                         // If we haven't received any updates for a while, we should  | 
197  | 105  |                         // let the database know that we are still listening to prevent  | 
198  | 105  |                         // the action from being dropped.  | 
199  | 105  |                     }  | 
200  |  |                 }  | 
201  |  |             }  | 
202  | 44  |             client_info.client_operation_id.clone()  | 
203  |  |         };  | 
204  |  |         // At this stage we know that this event is a client request, so we need  | 
205  |  |         // to populate the client_operation_id.  | 
206  | 44  |         let mut awaited_action = self.awaited_action_rx.borrow().clone();  | 
207  | 44  |         awaited_action.set_client_operation_id(client_operation_id);  | 
208  | 44  |         Ok(awaited_action)  | 
209  | 44  |     }  | 
210  |  |  | 
211  | 668  |     async fn borrow(&self) -> Result<AwaitedAction, Error> { | 
212  | 668  |         let mut awaited_action = self.awaited_action_rx.borrow().clone();  | 
213  | 668  |         if let Some(client_info16 ) = self.client_info.as_ref() {   Branch (213:16): [True: 0, False: 0]
   Branch (213:16): [Folded - Ignored]
   Branch (213:16): [True: 16, False: 652]
  | 
214  | 16  |             awaited_action.set_client_operation_id(client_info.client_operation_id.clone());  | 
215  | 652  |         }  | 
216  | 668  |         Ok(awaited_action)  | 
217  | 668  |     }  | 
218  |  | }  | 
219  |  |  | 
220  |  | /// A struct that is used to keep the developer from trying to  | 
221  |  | /// return early from a function.  | 
222  |  | struct NoEarlyReturn;  | 
223  |  |  | 
224  |  | #[derive(Debug, Default, MetricsComponent)]  | 
225  |  | struct SortedAwaitedActions { | 
226  |  |     #[metric(group = "unknown")]  | 
227  |  |     unknown: BTreeSet<SortedAwaitedAction>,  | 
228  |  |     #[metric(group = "cache_check")]  | 
229  |  |     cache_check: BTreeSet<SortedAwaitedAction>,  | 
230  |  |     #[metric(group = "queued")]  | 
231  |  |     queued: BTreeSet<SortedAwaitedAction>,  | 
232  |  |     #[metric(group = "executing")]  | 
233  |  |     executing: BTreeSet<SortedAwaitedAction>,  | 
234  |  |     #[metric(group = "completed")]  | 
235  |  |     completed: BTreeSet<SortedAwaitedAction>,  | 
236  |  | }  | 
237  |  |  | 
238  |  | impl SortedAwaitedActions { | 
239  | 39  |     const fn btree_for_state(&mut self, state: &ActionStage) -> &mut BTreeSet<SortedAwaitedAction> { | 
240  | 39  |         match state { | 
241  | 0  |             ActionStage::Unknown => &mut self.unknown,  | 
242  | 0  |             ActionStage::CacheCheck => &mut self.cache_check,  | 
243  | 26  |             ActionStage::Queued => &mut self.queued,  | 
244  | 13  |             ActionStage::Executing => &mut self.executing,  | 
245  | 0  |             ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => &mut self.completed,  | 
246  |  |         }  | 
247  | 39  |     }  | 
248  |  |  | 
249  | 63  |     fn insert_sort_map_for_stage(  | 
250  | 63  |         &mut self,  | 
251  | 63  |         stage: &ActionStage,  | 
252  | 63  |         sorted_awaited_action: &SortedAwaitedAction,  | 
253  | 63  |     ) -> Result<(), Error> { | 
254  | 63  |         let newly_inserted = match stage { | 
255  | 0  |             ActionStage::Unknown => self.unknown.insert(sorted_awaited_action.clone()),  | 
256  | 0  |             ActionStage::CacheCheck => self.cache_check.insert(sorted_awaited_action.clone()),  | 
257  | 31  |             ActionStage::Queued => self.queued.insert(sorted_awaited_action.clone()),  | 
258  | 26  |             ActionStage::Executing => self.executing.insert(sorted_awaited_action.clone()),  | 
259  | 6  |             ActionStage::Completed(_) => self.completed.insert(sorted_awaited_action.clone()),  | 
260  |  |             ActionStage::CompletedFromCache(_) => { | 
261  | 0  |                 self.completed.insert(sorted_awaited_action.clone())  | 
262  |  |             }  | 
263  |  |         };  | 
264  | 63  |         if !newly_inserted {  Branch (264:12): [True: 0, False: 63]
   Branch (264:12): [Folded - Ignored]
  | 
265  | 0  |             return Err(make_err!(  | 
266  | 0  |                 Code::Internal,  | 
267  | 0  |                 "Tried to insert an action that was already in the sorted map. This should never happen. {:?} - {:?}", | 
268  | 0  |                 stage,  | 
269  | 0  |                 sorted_awaited_action  | 
270  | 0  |             ));  | 
271  | 63  |         }  | 
272  | 63  |         Ok(())  | 
273  | 63  |     }  | 
274  |  |  | 
275  | 39  |     fn process_state_changes(  | 
276  | 39  |         &mut self,  | 
277  | 39  |         old_awaited_action: &AwaitedAction,  | 
278  | 39  |         new_awaited_action: &AwaitedAction,  | 
279  | 39  |     ) -> Result<(), Error> { | 
280  | 39  |         let btree = self.btree_for_state(&old_awaited_action.state().stage);  | 
281  | 39  |         let maybe_sorted_awaited_action = btree.take(&SortedAwaitedAction { | 
282  | 39  |             sort_key: old_awaited_action.sort_key(),  | 
283  | 39  |             operation_id: new_awaited_action.operation_id().clone(),  | 
284  | 39  |         });  | 
285  |  |  | 
286  | 39  |         let Some(sorted_awaited_action) = maybe_sorted_awaited_action else {  Branch (286:13): [True: 39, False: 0]
   Branch (286:13): [Folded - Ignored]
  | 
287  | 0  |             return Err(make_err!(  | 
288  | 0  |                 Code::Internal,  | 
289  | 0  |                 "sorted_action_info_hash_keys and action_info_hash_key_to_awaited_action are out of sync - {} - {:?}", | 
290  | 0  |                 new_awaited_action.operation_id(),  | 
291  | 0  |                 new_awaited_action,  | 
292  | 0  |             ));  | 
293  |  |         };  | 
294  |  |  | 
295  | 39  |         self.insert_sort_map_for_stage(&new_awaited_action.state().stage, &sorted_awaited_action)  | 
296  | 39  |             .err_tip(|| "In AwaitedActionDb::update_awaited_action")?0 ;  | 
297  | 39  |         Ok(())  | 
298  | 39  |     }  | 
299  |  | }  | 
300  |  |  | 
301  |  | /// The database for storing the state of all actions.  | 
302  |  | #[derive(Debug, MetricsComponent)]  | 
303  |  | pub struct AwaitedActionDbImpl<I: InstantWrapper, NowFn: Fn() -> I> { | 
304  |  |     /// A lookup table to lookup the state of an action by its client operation id.  | 
305  |  |     #[metric(group = "client_operation_ids")]  | 
306  |  |     client_operation_to_awaited_action:  | 
307  |  |         EvictingMap<OperationId, OperationId, Arc<ClientAwaitedAction>, I>,  | 
308  |  |  | 
309  |  |     /// A lookup table to lookup the state of an action by its worker operation id.  | 
310  |  |     #[metric(group = "operation_ids")]  | 
311  |  |     operation_id_to_awaited_action: BTreeMap<OperationId, watch::Sender<AwaitedAction>>,  | 
312  |  |  | 
313  |  |     /// A lookup table to lookup the state of an action by its unique qualifier.  | 
314  |  |     #[metric(group = "action_info_hash_key_to_awaited_action")]  | 
315  |  |     action_info_hash_key_to_awaited_action: HashMap<ActionUniqueKey, OperationId>,  | 
316  |  |  | 
317  |  |     /// A sorted set of [`AwaitedAction`]s. A wrapper is used to perform sorting  | 
318  |  |     /// based on the [`AwaitedActionSortKey`] of the [`AwaitedAction`].  | 
319  |  |     ///  | 
320  |  |     /// See [`AwaitedActionSortKey`] for more information on the ordering.  | 
321  |  |     #[metric(group = "sorted_action_infos")]  | 
322  |  |     sorted_action_info_hash_keys: SortedAwaitedActions,  | 
323  |  |  | 
324  |  |     /// The number of connected clients for each operation id.  | 
325  |  |     #[metric(group = "connected_clients_for_operation_id")]  | 
326  |  |     connected_clients_for_operation_id: HashMap<OperationId, usize>,  | 
327  |  |  | 
328  |  |     /// Where to send notifications about important events related to actions.  | 
329  |  |     action_event_tx: mpsc::UnboundedSender<ActionEvent>,  | 
330  |  |  | 
331  |  |     /// The function to get the current time.  | 
332  |  |     now_fn: NowFn,  | 
333  |  | }  | 
334  |  |  | 
335  |  | impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbImpl<I, NowFn> { | 
336  | 3  |     async fn get_awaited_action_by_id(  | 
337  | 3  |         &self,  | 
338  | 3  |         client_operation_id: &OperationId,  | 
339  | 3  |     ) -> Result<Option<MemoryAwaitedActionSubscriber<I, NowFn>>, Error> { | 
340  | 3  |         let maybe_client_awaited_action = self  | 
341  | 3  |             .client_operation_to_awaited_action  | 
342  | 3  |             .get(client_operation_id)  | 
343  | 3  |             .await;  | 
344  | 3  |         let Some(client_awaited_action) = maybe_client_awaited_action else {  Branch (344:13): [True: 0, False: 0]
   Branch (344:13): [Folded - Ignored]
   Branch (344:13): [True: 3, False: 0]
  | 
345  | 0  |             return Ok(None);  | 
346  |  |         };  | 
347  |  |  | 
348  | 3  |         self.operation_id_to_awaited_action  | 
349  | 3  |             .get(client_awaited_action.operation_id())  | 
350  | 3  |             .map(|tx| { | 
351  | 3  |                 Some(MemoryAwaitedActionSubscriber::new_with_client(  | 
352  | 3  |                     tx.subscribe(),  | 
353  | 3  |                     client_operation_id.clone(),  | 
354  | 3  |                     self.action_event_tx.clone(),  | 
355  | 3  |                     self.now_fn.clone(),  | 
356  | 3  |                 ))  | 
357  | 3  |             })  | 
358  | 3  |             .ok_or_else(|| {0   | 
359  | 0  |                 make_err!(  | 
360  | 0  |                     Code::Internal,  | 
361  |  |                     "Failed to get client operation id {client_operation_id:?}" | 
362  |  |                 )  | 
363  | 0  |             })  | 
364  | 3  |     }  | 
365  |  |  | 
366  |  |     /// Processes action events that need to be handled by the database.  | 
367  | 55  |     async fn handle_action_events(  | 
368  | 55  |         &mut self,  | 
369  | 55  |         action_events: impl IntoIterator<Item = ActionEvent>,  | 
370  | 55  |     ) -> NoEarlyReturn { | 
371  | 110  |         for action55  in action_events {  | 
372  | 55  |             debug!(?action, "Handling action");  | 
373  | 55  |             match action { | 
374  | 1  |                 ActionEvent::ClientDroppedOperation(operation_id) => { | 
375  |  |                     // Cleanup operation_id_to_awaited_action.  | 
376  | 1  |                     let Some(tx) = self.operation_id_to_awaited_action.remove(&operation_id) else {  Branch (376:25): [True: 0, False: 0]
   Branch (376:25): [Folded - Ignored]
   Branch (376:25): [True: 1, False: 0]
  | 
377  | 0  |                         error!(  | 
378  |  |                             ?operation_id,  | 
379  | 0  |                             "operation_id_to_awaited_action does not have operation_id"  | 
380  |  |                         );  | 
381  | 0  |                         continue;  | 
382  |  |                     };  | 
383  |  |  | 
384  | 1  |                     let connected_clients = match self  | 
385  | 1  |                         .connected_clients_for_operation_id  | 
386  | 1  |                         .entry(operation_id.clone())  | 
387  |  |                     { | 
388  | 1  |                         Entry::Occupied(entry) => { | 
389  | 1  |                             let value = *entry.get();  | 
390  | 1  |                             entry.remove();  | 
391  | 1  |                             value - 1  | 
392  |  |                         }  | 
393  |  |                         Entry::Vacant(_) => { | 
394  | 0  |                             error!(  | 
395  |  |                                 ?operation_id,  | 
396  | 0  |                                 "connected_clients_for_operation_id does not have operation_id"  | 
397  |  |                             );  | 
398  | 0  |                             0  | 
399  |  |                         }  | 
400  |  |                     };  | 
401  |  |  | 
402  |  |                     // Note: It is rare to have more than one client listening  | 
403  |  |                     // to the same action, so we assume that we are the last  | 
404  |  |                     // client and insert it back into the map if we detect that  | 
405  |  |                     // there are still clients listening (ie: the happy path  | 
406  |  |                     // is operation.connected_clients == 0).  | 
407  | 1  |                     if connected_clients != 0 {  Branch (407:24): [True: 0, False: 0]
   Branch (407:24): [Folded - Ignored]
   Branch (407:24): [True: 1, False: 0]
  | 
408  | 1  |                         self.operation_id_to_awaited_action  | 
409  | 1  |                             .insert(operation_id.clone(), tx);  | 
410  | 1  |                         self.connected_clients_for_operation_id  | 
411  | 1  |                             .insert(operation_id, connected_clients);  | 
412  | 1  |                         continue;  | 
413  | 0  |                     }  | 
414  | 0  |                     debug!(?operation_id, "Clearing operation from state manager");  | 
415  | 0  |                     let awaited_action = tx.borrow().clone();  | 
416  |  |                     // Cleanup action_info_hash_key_to_awaited_action if it was marked cached.  | 
417  | 0  |                     match &awaited_action.action_info().unique_qualifier { | 
418  | 0  |                         ActionUniqueQualifier::Cacheable(action_key) => { | 
419  | 0  |                             let maybe_awaited_action = self  | 
420  | 0  |                                 .action_info_hash_key_to_awaited_action  | 
421  | 0  |                                 .remove(action_key);  | 
422  | 0  |                             if !awaited_action.state().stage.is_finished()   Branch (422:32): [True: 0, False: 0]
   Branch (422:32): [Folded - Ignored]
   Branch (422:32): [True: 0, False: 0]
  | 
423  | 0  |                                 && maybe_awaited_action.is_none()   Branch (423:36): [True: 0, False: 0]
   Branch (423:36): [Folded - Ignored]
   Branch (423:36): [True: 0, False: 0]
  | 
424  |  |                             { | 
425  | 0  |                                 error!(  | 
426  |  |                                     ?operation_id,  | 
427  |  |                                     ?awaited_action,  | 
428  |  |                                     ?action_key,  | 
429  | 0  |                                     "action_info_hash_key_to_awaited_action and operation_id_to_awaited_action are out of sync",  | 
430  |  |                                 );  | 
431  | 0  |                             }  | 
432  |  |                         }  | 
433  | 0  |                         ActionUniqueQualifier::Uncacheable(_action_key) => { | 
434  | 0  |                             // This Operation should not be in the hash_key map.  | 
435  | 0  |                         }  | 
436  |  |                     }  | 
437  |  |  | 
438  |  |                     // Cleanup sorted_awaited_action.  | 
439  | 0  |                     let sort_key = awaited_action.sort_key();  | 
440  | 0  |                     let sort_btree_for_state = self  | 
441  | 0  |                         .sorted_action_info_hash_keys  | 
442  | 0  |                         .btree_for_state(&awaited_action.state().stage);  | 
443  |  |  | 
444  | 0  |                     let maybe_sorted_awaited_action =  | 
445  | 0  |                         sort_btree_for_state.take(&SortedAwaitedAction { | 
446  | 0  |                             sort_key,  | 
447  | 0  |                             operation_id: operation_id.clone(),  | 
448  | 0  |                         });  | 
449  | 0  |                     if maybe_sorted_awaited_action.is_none() {  Branch (449:24): [True: 0, False: 0]
   Branch (449:24): [Folded - Ignored]
   Branch (449:24): [True: 0, False: 0]
  | 
450  | 0  |                         error!(  | 
451  |  |                             ?operation_id,  | 
452  |  |                             ?sort_key,  | 
453  | 0  |                             "Expected maybe_sorted_awaited_action to have {sort_key:?}", | 
454  |  |                         );  | 
455  | 0  |                     }  | 
456  |  |                 }  | 
457  | 54  |                 ActionEvent::ClientKeepAlive(client_id) => { | 
458  | 54  |                     if let Some(client_awaited_action) = self   Branch (458:28): [True: 0, False: 0]
   Branch (458:28): [Folded - Ignored]
   Branch (458:28): [True: 54, False: 0]
  | 
459  | 54  |                         .client_operation_to_awaited_action  | 
460  | 54  |                         .get(&client_id)  | 
461  | 54  |                         .await  | 
462  |  |                     { | 
463  | 54  |                         if let Some(awaited_action_sender) = self   Branch (463:32): [True: 0, False: 0]
   Branch (463:32): [Folded - Ignored]
   Branch (463:32): [True: 54, False: 0]
  | 
464  | 54  |                             .operation_id_to_awaited_action  | 
465  | 54  |                             .get(&client_awaited_action.operation_id)  | 
466  |  |                         { | 
467  | 54  |                             awaited_action_sender.send_if_modified(|awaited_action| { | 
468  | 54  |                                 awaited_action.update_client_keep_alive((self.now_fn)().now());  | 
469  | 54  |                                 false  | 
470  | 54  |                             });  | 
471  | 0  |                         }  | 
472  |  |                     } else { | 
473  | 0  |                         error!(  | 
474  |  |                             ?client_id,  | 
475  | 0  |                             "client_operation_to_awaited_action does not have client_id",  | 
476  |  |                         );  | 
477  |  |                     }  | 
478  |  |                 }  | 
479  |  |             }  | 
480  |  |         }  | 
481  | 55  |         NoEarlyReturn  | 
482  | 55  |     }  | 
483  |  |  | 
484  | 0  |     fn get_awaited_actions_range(  | 
485  | 0  |         &self,  | 
486  | 0  |         start: Bound<&OperationId>,  | 
487  | 0  |         end: Bound<&OperationId>,  | 
488  | 0  |     ) -> impl Iterator<Item = (&'_ OperationId, MemoryAwaitedActionSubscriber<I, NowFn>)>  | 
489  | 0  |     + use<'_, I, NowFn> { | 
490  | 0  |         self.operation_id_to_awaited_action  | 
491  | 0  |             .range((start, end))  | 
492  | 0  |             .map(|(operation_id, tx)| { | 
493  | 0  |                 (  | 
494  | 0  |                     operation_id,  | 
495  | 0  |                     MemoryAwaitedActionSubscriber::<I, NowFn>::new(tx.subscribe()),  | 
496  | 0  |                 )  | 
497  | 0  |             })  | 
498  | 0  |     }  | 
499  |  |  | 
500  | 583  |     fn get_by_operation_id(  | 
501  | 583  |         &self,  | 
502  | 583  |         operation_id: &OperationId,  | 
503  | 583  |     ) -> Option<MemoryAwaitedActionSubscriber<I, NowFn>> { | 
504  | 583  |         self.operation_id_to_awaited_action  | 
505  | 583  |             .get(operation_id)  | 
506  | 583  |             .map(|tx| MemoryAwaitedActionSubscriber::<I, NowFn>::new(tx.subscribe()))  | 
507  | 583  |     }  | 
508  |  |  | 
509  | 1.13k  |     fn get_range_of_actions(  | 
510  | 1.13k  |         &self,  | 
511  | 1.13k  |         state: SortedAwaitedActionState,  | 
512  | 1.13k  |         range: impl RangeBounds<SortedAwaitedAction>,  | 
513  | 1.13k  |     ) -> impl DoubleEndedIterator<  | 
514  | 1.13k  |         Item = Result<  | 
515  | 1.13k  |             (  | 
516  | 1.13k  |                 &SortedAwaitedAction,  | 
517  | 1.13k  |                 MemoryAwaitedActionSubscriber<I, NowFn>,  | 
518  | 1.13k  |             ),  | 
519  | 1.13k  |             Error,  | 
520  | 1.13k  |         >,  | 
521  | 1.13k  |     > { | 
522  | 1.13k  |         let btree = match state { | 
523  | 0  |             SortedAwaitedActionState::CacheCheck => &self.sorted_action_info_hash_keys.cache_check,  | 
524  | 1.13k  |             SortedAwaitedActionState::Queued => &self.sorted_action_info_hash_keys.queued,  | 
525  | 0  |             SortedAwaitedActionState::Executing => &self.sorted_action_info_hash_keys.executing,  | 
526  | 0  |             SortedAwaitedActionState::Completed => &self.sorted_action_info_hash_keys.completed,  | 
527  |  |         };  | 
528  | 1.13k  |         btree.range(range).map(|sorted_awaited_action| {543   | 
529  | 543  |             let operation_id = &sorted_awaited_action.operation_id;  | 
530  | 543  |             self.get_by_operation_id(operation_id)  | 
531  | 543  |                 .ok_or_else(|| {0   | 
532  | 0  |                     make_err!(  | 
533  | 0  |                         Code::Internal,  | 
534  |  |                         "Failed to get operation id {}", | 
535  |  |                         operation_id  | 
536  |  |                     )  | 
537  | 0  |                 })  | 
538  | 543  |                 .map(|subscriber| (sorted_awaited_action, subscriber))  | 
539  | 543  |         })  | 
540  | 1.13k  |     }  | 
541  |  |  | 
542  | 39  |     fn process_state_changes_for_hash_key_map(  | 
543  | 39  |         action_info_hash_key_to_awaited_action: &mut HashMap<ActionUniqueKey, OperationId>,  | 
544  | 39  |         new_awaited_action: &AwaitedAction,  | 
545  | 39  |     ) { | 
546  |  |         // Only process changes if the stage is not finished.  | 
547  | 39  |         if !new_awaited_action.state().stage.is_finished() {  Branch (547:12): [True: 0, False: 0]
   Branch (547:12): [Folded - Ignored]
   Branch (547:12): [True: 33, False: 6]
  | 
548  | 33  |             return;  | 
549  | 6  |         }  | 
550  | 6  |         match &new_awaited_action.action_info().unique_qualifier { | 
551  | 6  |             ActionUniqueQualifier::Cacheable(action_key) => { | 
552  | 6  |                 let maybe_awaited_action =  | 
553  | 6  |                     action_info_hash_key_to_awaited_action.remove(action_key);  | 
554  | 6  |                 match maybe_awaited_action { | 
555  | 6  |                     Some(removed_operation_id) => { | 
556  | 6  |                         if &removed_operation_id != new_awaited_action.operation_id() {  Branch (556:28): [True: 0, False: 0]
   Branch (556:28): [Folded - Ignored]
   Branch (556:28): [True: 0, False: 6]
  | 
557  | 0  |                             error!(  | 
558  |  |                                 ?removed_operation_id,  | 
559  |  |                                 ?new_awaited_action,  | 
560  |  |                                 ?action_key,  | 
561  | 0  |                                 "action_info_hash_key_to_awaited_action and operation_id_to_awaited_action are out of sync",  | 
562  |  |                             );  | 
563  | 6  |                         }  | 
564  |  |                     }  | 
565  |  |                     None => { | 
566  | 0  |                         error!(  | 
567  |  |                             ?new_awaited_action,  | 
568  |  |                             ?action_key,  | 
569  | 0  |                             "action_info_hash_key_to_awaited_action out of sync, it should have had the unique_key",  | 
570  |  |                         );  | 
571  |  |                     }  | 
572  |  |                 }  | 
573  |  |             }  | 
574  | 0  |             ActionUniqueQualifier::Uncacheable(_action_key) => { | 
575  | 0  |                 // If we are not cacheable, the action should not be in the  | 
576  | 0  |                 // hash_key map, so we don't need to process anything in  | 
577  | 0  |                 // action_info_hash_key_to_awaited_action.  | 
578  | 0  |             }  | 
579  |  |         }  | 
580  | 39  |     }  | 
581  |  |  | 
582  | 39  |     fn update_awaited_action(  | 
583  | 39  |         &mut self,  | 
584  | 39  |         mut new_awaited_action: AwaitedAction,  | 
585  | 39  |     ) -> Result<(), Error> { | 
586  | 39  |         let tx = self  | 
587  | 39  |             .operation_id_to_awaited_action  | 
588  | 39  |             .get(new_awaited_action.operation_id())  | 
589  | 39  |             .ok_or_else(|| {0   | 
590  | 0  |                 make_err!(  | 
591  | 0  |                     Code::Internal,  | 
592  |  |                     "OperationId does not exist in map in AwaitedActionDb::update_awaited_action"  | 
593  |  |                 )  | 
594  | 0  |             })?;  | 
595  |  |         { | 
596  |  |             // Note: It's important to drop old_awaited_action before we call  | 
597  |  |             // send_replace or we will have a deadlock.  | 
598  | 39  |             let old_awaited_action = tx.borrow();  | 
599  |  |  | 
600  |  |             // Do not process changes if the action version is not in sync with  | 
601  |  |             // what the sender based the update on.  | 
602  | 39  |             if old_awaited_action.version() != new_awaited_action.version() {  Branch (602:16): [True: 0, False: 0]
   Branch (602:16): [Folded - Ignored]
   Branch (602:16): [True: 0, False: 39]
  | 
603  | 0  |                 return Err(make_err!(  | 
604  | 0  |                     // From: https://grpc.github.io/grpc/core/md_doc_statuscodes.html  | 
605  | 0  |                     // Use ABORTED if the client should retry at a higher level  | 
606  | 0  |                     // (e.g., when a client-specified test-and-set fails,  | 
607  | 0  |                     // indicating the client should restart a read-modify-write  | 
608  | 0  |                     // sequence)  | 
609  | 0  |                     Code::Aborted,  | 
610  | 0  |                     "{} Expected {} but got {} for operation_id {:?} - {:?}", | 
611  | 0  |                     "Tried to update an awaited action with an incorrect version.",  | 
612  | 0  |                     old_awaited_action.version(),  | 
613  | 0  |                     new_awaited_action.version(),  | 
614  | 0  |                     old_awaited_action,  | 
615  | 0  |                     new_awaited_action,  | 
616  | 0  |                 ));  | 
617  | 39  |             }  | 
618  | 39  |             new_awaited_action.increment_version();  | 
619  |  |  | 
620  | 0  |             error_if!(  | 
621  | 39  |                 old_awaited_action.action_info().unique_qualifier   Branch (621:17): [True: 0, False: 0]
   Branch (621:17): [Folded - Ignored]
   Branch (621:17): [True: 0, False: 39]
  | 
622  | 39  |                     != new_awaited_action.action_info().unique_qualifier,  | 
623  |  |                 "Unique key changed for operation_id {:?} - {:?} - {:?}", | 
624  | 0  |                 new_awaited_action.operation_id(),  | 
625  | 0  |                 old_awaited_action.action_info(),  | 
626  | 0  |                 new_awaited_action.action_info(),  | 
627  |  |             );  | 
628  | 39  |             let is_same_stage = old_awaited_action  | 
629  | 39  |                 .state()  | 
630  | 39  |                 .stage  | 
631  | 39  |                 .is_same_stage(&new_awaited_action.state().stage);  | 
632  |  |  | 
633  | 39  |             if !is_same_stage {  Branch (633:16): [True: 0, False: 0]
   Branch (633:16): [Folded - Ignored]
   Branch (633:16): [True: 39, False: 0]
  | 
634  | 39  |                 self.sorted_action_info_hash_keys  | 
635  | 39  |                     .process_state_changes(&old_awaited_action, &new_awaited_action)?0 ;  | 
636  | 39  |                 Self::process_state_changes_for_hash_key_map(  | 
637  | 39  |                     &mut self.action_info_hash_key_to_awaited_action,  | 
638  | 39  |                     &new_awaited_action,  | 
639  |  |                 );  | 
640  | 0  |             }  | 
641  |  |         }  | 
642  |  |  | 
643  |  |         // Notify all listeners of the new state and ignore if no one is listening.  | 
644  |  |         // Note: Do not use `.send()` as it will not update the state if all listeners  | 
645  |  |         // are dropped.  | 
646  | 39  |         drop(tx.send_replace(new_awaited_action));  | 
647  |  |  | 
648  | 39  |         Ok(())  | 
649  | 39  |     }  | 
650  |  |  | 
651  |  |     /// Creates a new [`ClientAwaitedAction`] and a [`watch::Receiver`] to  | 
652  |  |     /// listen for changes. We don't do this in-line because it is important  | 
653  |  |     /// to ALWAYS construct a [`ClientAwaitedAction`] before inserting it into  | 
654  |  |     /// the map. Failing to do so may result in memory leaks. This is because  | 
655  |  |     /// [`ClientAwaitedAction`] implements a drop function that will trigger  | 
656  |  |     /// cleanup of the other maps on drop.  | 
657  | 24  |     fn make_client_awaited_action(  | 
658  | 24  |         &mut self,  | 
659  | 24  |         operation_id: &OperationId,  | 
660  | 24  |         awaited_action: AwaitedAction,  | 
661  | 24  |     ) -> (Arc<ClientAwaitedAction>, watch::Receiver<AwaitedAction>) { | 
662  | 24  |         let (tx, rx) = watch::channel(awaited_action);  | 
663  | 24  |         let client_awaited_action = Arc::new(ClientAwaitedAction::new(  | 
664  | 24  |             operation_id.clone(),  | 
665  | 24  |             self.action_event_tx.clone(),  | 
666  |  |         ));  | 
667  | 24  |         self.operation_id_to_awaited_action  | 
668  | 24  |             .insert(operation_id.clone(), tx);  | 
669  | 24  |         self.connected_clients_for_operation_id  | 
670  | 24  |             .insert(operation_id.clone(), 1);  | 
671  | 24  |         (client_awaited_action, rx)  | 
672  | 24  |     }  | 
673  |  |  | 
674  | 27  |     async fn add_action(  | 
675  | 27  |         &mut self,  | 
676  | 27  |         client_operation_id: OperationId,  | 
677  | 27  |         action_info: Arc<ActionInfo>,  | 
678  | 27  |     ) -> Result<MemoryAwaitedActionSubscriber<I, NowFn>, Error> { | 
679  |  |         // Check to see if the action is already known and subscribe if it is.  | 
680  | 27  |         let subscription_result = self  | 
681  | 27  |             .try_subscribe(  | 
682  | 27  |                 &client_operation_id,  | 
683  | 27  |                 &action_info.unique_qualifier,  | 
684  | 27  |                 action_info.priority,  | 
685  | 27  |             )  | 
686  | 27  |             .await  | 
687  | 27  |             .err_tip(|| "In AwaitedActionDb::subscribe_or_add_action");  | 
688  | 27  |         match subscription_result { | 
689  | 0  |             Err(err) => return Err(err),  | 
690  | 3  |             Ok(Some(subscription)) => return Ok(subscription),  | 
691  | 24  |             Ok(None) => { /* Add item to queue. */ } | 
692  |  |         }  | 
693  |  |  | 
694  | 24  |         let maybe_unique_key = match &action_info.unique_qualifier { | 
695  | 24  |             ActionUniqueQualifier::Cacheable(unique_key) => Some(unique_key.clone()),  | 
696  | 0  |             ActionUniqueQualifier::Uncacheable(_unique_key) => None,  | 
697  |  |         };  | 
698  | 24  |         let operation_id = OperationId::default();  | 
699  | 24  |         let awaited_action =  | 
700  | 24  |             AwaitedAction::new(operation_id.clone(), action_info, (self.now_fn)().now());  | 
701  | 24  |         debug_assert!(  | 
702  | 0  |             ActionStage::Queued == awaited_action.state().stage,  | 
703  | 0  |             "Expected action to be queued"  | 
704  |  |         );  | 
705  | 24  |         let sort_key = awaited_action.sort_key();  | 
706  |  |  | 
707  | 24  |         let (client_awaited_action, rx) =  | 
708  | 24  |             self.make_client_awaited_action(&operation_id.clone(), awaited_action);  | 
709  |  |  | 
710  | 24  |         debug!(  | 
711  |  |             ?client_operation_id,  | 
712  |  |             ?operation_id,  | 
713  |  |             ?client_awaited_action,  | 
714  | 24  |             "Adding action"  | 
715  |  |         );  | 
716  |  |  | 
717  | 24  |         self.client_operation_to_awaited_action  | 
718  | 24  |             .insert(client_operation_id.clone(), client_awaited_action)  | 
719  | 24  |             .await;  | 
720  |  |  | 
721  |  |         // Note: We only put items in the map that are cacheable.  | 
722  | 24  |         if let Some(unique_key) = maybe_unique_key {  Branch (722:16): [True: 0, False: 0]
   Branch (722:16): [Folded - Ignored]
   Branch (722:16): [True: 24, False: 0]
  | 
723  | 24  |             let old_value = self  | 
724  | 24  |                 .action_info_hash_key_to_awaited_action  | 
725  | 24  |                 .insert(unique_key, operation_id.clone());  | 
726  | 24  |             if let Some(old_value0 ) = old_value {   Branch (726:20): [True: 0, False: 0]
   Branch (726:20): [Folded - Ignored]
   Branch (726:20): [True: 0, False: 24]
  | 
727  | 0  |                 error!(  | 
728  |  |                     ?operation_id,  | 
729  |  |                     ?old_value,  | 
730  | 0  |                     "action_info_hash_key_to_awaited_action already has unique_key"  | 
731  |  |                 );  | 
732  | 24  |             }  | 
733  | 0  |         }  | 
734  |  |  | 
735  | 24  |         self.sorted_action_info_hash_keys  | 
736  | 24  |             .insert_sort_map_for_stage(  | 
737  | 24  |                 &ActionStage::Queued,  | 
738  | 24  |                 &SortedAwaitedAction { | 
739  | 24  |                     sort_key,  | 
740  | 24  |                     operation_id,  | 
741  | 24  |                 },  | 
742  |  |             )  | 
743  | 24  |             .err_tip(|| "In AwaitedActionDb::subscribe_or_add_action")?0 ;  | 
744  |  |  | 
745  | 24  |         Ok(MemoryAwaitedActionSubscriber::new_with_client(  | 
746  | 24  |             rx,  | 
747  | 24  |             client_operation_id,  | 
748  | 24  |             self.action_event_tx.clone(),  | 
749  | 24  |             self.now_fn.clone(),  | 
750  | 24  |         ))  | 
751  | 27  |     }  | 
752  |  |  | 
753  | 27  |     async fn try_subscribe(  | 
754  | 27  |         &mut self,  | 
755  | 27  |         client_operation_id: &OperationId,  | 
756  | 27  |         unique_qualifier: &ActionUniqueQualifier,  | 
757  | 27  |         // TODO(palfrey) To simplify the scheduler 2024 refactor, we  | 
758  | 27  |         // removed the ability to upgrade priorities of actions.  | 
759  | 27  |         // we should add priority upgrades back in.  | 
760  | 27  |         _priority: i32,  | 
761  | 27  |     ) -> Result<Option<MemoryAwaitedActionSubscriber<I, NowFn>>, Error> { | 
762  | 27  |         let unique_key = match unique_qualifier { | 
763  | 27  |             ActionUniqueQualifier::Cacheable(unique_key) => unique_key,  | 
764  | 0  |             ActionUniqueQualifier::Uncacheable(_unique_key) => return Ok(None),  | 
765  |  |         };  | 
766  |  |  | 
767  | 27  |         let Some(operation_id3 ) = self.action_info_hash_key_to_awaited_action.get(unique_key) else {   Branch (767:13): [True: 0, False: 0]
   Branch (767:13): [Folded - Ignored]
   Branch (767:13): [True: 3, False: 24]
  | 
768  | 24  |             return Ok(None); // Not currently running.  | 
769  |  |         };  | 
770  |  |  | 
771  | 3  |         let Some(tx) = self.operation_id_to_awaited_action.get(operation_id) else {  Branch (771:13): [True: 0, False: 0]
   Branch (771:13): [Folded - Ignored]
   Branch (771:13): [True: 3, False: 0]
  | 
772  | 0  |             return Err(make_err!(  | 
773  | 0  |                 Code::Internal,  | 
774  | 0  |                 "operation_id_to_awaited_action and action_info_hash_key_to_awaited_action are out of sync for {unique_key:?} - {operation_id}" | 
775  | 0  |             ));  | 
776  |  |         };  | 
777  |  |  | 
778  | 0  |         error_if!(  | 
779  | 3  |             tx.borrow().state().stage.is_finished(),   Branch (779:13): [True: 0, False: 0]
   Branch (779:13): [Folded - Ignored]
   Branch (779:13): [True: 0, False: 3]
  | 
780  |  |             "Tried to subscribe to a completed action but it already finished. This should never happen. {:?}", | 
781  | 0  |             tx.borrow()  | 
782  |  |         );  | 
783  |  |  | 
784  | 3  |         let maybe_connected_clients = self  | 
785  | 3  |             .connected_clients_for_operation_id  | 
786  | 3  |             .get_mut(operation_id);  | 
787  | 3  |         let Some(connected_clients) = maybe_connected_clients else {  Branch (787:13): [True: 0, False: 0]
   Branch (787:13): [Folded - Ignored]
   Branch (787:13): [True: 3, False: 0]
  | 
788  | 0  |             return Err(make_err!(  | 
789  | 0  |                 Code::Internal,  | 
790  | 0  |                 "connected_clients_for_operation_id and operation_id_to_awaited_action are out of sync for {unique_key:?} - {operation_id}" | 
791  | 0  |             ));  | 
792  |  |         };  | 
793  | 3  |         *connected_clients += 1;  | 
794  |  |  | 
795  |  |         // Immediately mark the keep alive, we don't need to wake anyone  | 
796  |  |         // so we always fake that it was not actually changed.  | 
797  |  |         // Failing update the client could lead to the client connecting  | 
798  |  |         // then not updating the keep alive in time, resulting in the  | 
799  |  |         // operation timing out due to async behavior.  | 
800  | 3  |         tx.send_if_modified(|awaited_action| { | 
801  | 3  |             awaited_action.update_client_keep_alive((self.now_fn)().now());  | 
802  | 3  |             false  | 
803  | 3  |         });  | 
804  | 3  |         let subscription = tx.subscribe();  | 
805  |  |  | 
806  | 3  |         self.client_operation_to_awaited_action  | 
807  | 3  |             .insert(  | 
808  | 3  |                 client_operation_id.clone(),  | 
809  | 3  |                 Arc::new(ClientAwaitedAction::new(  | 
810  | 3  |                     operation_id.clone(),  | 
811  | 3  |                     self.action_event_tx.clone(),  | 
812  | 3  |                 )),  | 
813  | 3  |             )  | 
814  | 3  |             .await;  | 
815  |  |  | 
816  | 3  |         Ok(Some(MemoryAwaitedActionSubscriber::new_with_client(  | 
817  | 3  |             subscription,  | 
818  | 3  |             client_operation_id.clone(),  | 
819  | 3  |             self.action_event_tx.clone(),  | 
820  | 3  |             self.now_fn.clone(),  | 
821  | 3  |         )))  | 
822  | 27  |     }  | 
823  |  | }  | 
824  |  |  | 
825  |  | #[derive(Debug, MetricsComponent)]  | 
826  |  | pub struct MemoryAwaitedActionDb<I: InstantWrapper, NowFn: Fn() -> I> { | 
827  |  |     #[metric]  | 
828  |  |     inner: Arc<Mutex<AwaitedActionDbImpl<I, NowFn>>>,  | 
829  |  |     tasks_change_notify: Arc<Notify>,  | 
830  |  |     _handle_awaited_action_events: JoinHandleDropGuard<()>,  | 
831  |  | }  | 
832  |  |  | 
833  |  | impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static>  | 
834  |  |     MemoryAwaitedActionDb<I, NowFn>  | 
835  |  | { | 
836  | 20  |     pub fn new(  | 
837  | 20  |         eviction_config: &EvictionPolicy,  | 
838  | 20  |         tasks_change_notify: Arc<Notify>,  | 
839  | 20  |         now_fn: NowFn,  | 
840  | 20  |     ) -> Self { | 
841  | 20  |         let (action_event_tx, mut action_event_rx) = mpsc::unbounded_channel();  | 
842  | 20  |         let inner = Arc::new(Mutex::new(AwaitedActionDbImpl { | 
843  | 20  |             client_operation_to_awaited_action: EvictingMap::new(eviction_config, (now_fn)()),  | 
844  | 20  |             operation_id_to_awaited_action: BTreeMap::new(),  | 
845  | 20  |             action_info_hash_key_to_awaited_action: HashMap::new(),  | 
846  | 20  |             sorted_action_info_hash_keys: SortedAwaitedActions::default(),  | 
847  | 20  |             connected_clients_for_operation_id: HashMap::new(),  | 
848  | 20  |             action_event_tx,  | 
849  | 20  |             now_fn,  | 
850  | 20  |         }));  | 
851  | 20  |         let weak_inner = Arc::downgrade(&inner);  | 
852  |  |         Self { | 
853  | 20  |             inner,  | 
854  | 20  |             tasks_change_notify,  | 
855  | 20  |             _handle_awaited_action_events: spawn!("handle_awaited_action_events", async move { | 
856  | 20  |                 let mut dropped_operation_ids = Vec::with_capacity(MAX_ACTION_EVENTS_RX_PER_CYCLE);  | 
857  |  |                 loop { | 
858  | 75  |                     dropped_operation_ids.clear();  | 
859  | 75  |                     action_event_rx  | 
860  | 75  |                         .recv_many(&mut dropped_operation_ids, MAX_ACTION_EVENTS_RX_PER_CYCLE)  | 
861  | 75  |                         .await;  | 
862  | 55  |                     let Some(inner) = weak_inner.upgrade() else {  Branch (862:25): [True: 0, False: 0]
   Branch (862:25): [Folded - Ignored]
   Branch (862:25): [True: 55, False: 0]
  | 
863  | 0  |                         return; // Nothing to cleanup, our struct is dropped.  | 
864  |  |                     };  | 
865  | 55  |                     let mut inner = inner.lock().await;  | 
866  | 55  |                     inner  | 
867  | 55  |                         .handle_action_events(dropped_operation_ids.drain(..))  | 
868  | 55  |                         .await;  | 
869  |  |                 }  | 
870  | 0  |             }),  | 
871  |  |         }  | 
872  | 20  |     }  | 
873  |  | }  | 
874  |  |  | 
875  |  | impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static> AwaitedActionDb  | 
876  |  |     for MemoryAwaitedActionDb<I, NowFn>  | 
877  |  | { | 
878  |  |     type Subscriber = MemoryAwaitedActionSubscriber<I, NowFn>;  | 
879  |  |  | 
880  | 3  |     async fn get_awaited_action_by_id(  | 
881  | 3  |         &self,  | 
882  | 3  |         client_operation_id: &OperationId,  | 
883  | 3  |     ) -> Result<Option<Self::Subscriber>, Error> { | 
884  | 3  |         self.inner  | 
885  | 3  |             .lock()  | 
886  | 3  |             .await  | 
887  | 3  |             .get_awaited_action_by_id(client_operation_id)  | 
888  | 3  |             .await  | 
889  | 3  |     }  | 
890  |  |  | 
891  | 0  |     async fn get_all_awaited_actions(  | 
892  | 0  |         &self,  | 
893  | 0  |     ) -> Result<impl Stream<Item = Result<Self::Subscriber, Error>>, Error> { | 
894  | 0  |         Ok(ChunkedStream::new(  | 
895  | 0  |             Bound::Unbounded,  | 
896  | 0  |             Bound::Unbounded,  | 
897  | 0  |             move |start, end, mut output| async move { | 
898  | 0  |                 let inner = self.inner.lock().await;  | 
899  | 0  |                 let mut maybe_new_start = None;  | 
900  |  |  | 
901  | 0  |                 for (operation_id, item) in  | 
902  | 0  |                     inner.get_awaited_actions_range(start.as_ref(), end.as_ref())  | 
903  | 0  |                 { | 
904  | 0  |                     output.push_back(item);  | 
905  | 0  |                     maybe_new_start = Some(operation_id);  | 
906  | 0  |                 }  | 
907  |  |  | 
908  | 0  |                 Ok(maybe_new_start  | 
909  | 0  |                     .map(|new_start| ((Bound::Excluded(new_start.clone()), end), output)))  | 
910  | 0  |             },  | 
911  |  |         ))  | 
912  | 0  |     }  | 
913  |  |  | 
914  | 40  |     async fn get_by_operation_id(  | 
915  | 40  |         &self,  | 
916  | 40  |         operation_id: &OperationId,  | 
917  | 40  |     ) -> Result<Option<Self::Subscriber>, Error> { | 
918  | 40  |         Ok(self.inner.lock().await.get_by_operation_id(operation_id))  | 
919  | 40  |     }  | 
920  |  |  | 
921  | 590  |     async fn get_range_of_actions(  | 
922  | 590  |         &self,  | 
923  | 590  |         state: SortedAwaitedActionState,  | 
924  | 590  |         start: Bound<SortedAwaitedAction>,  | 
925  | 590  |         end: Bound<SortedAwaitedAction>,  | 
926  | 590  |         desc: bool,  | 
927  | 590  |     ) -> Result<impl Stream<Item = Result<Self::Subscriber, Error>> + Send, Error> { | 
928  | 590  |         Ok(ChunkedStream::new(  | 
929  | 590  |             start,  | 
930  | 590  |             end,  | 
931  | 1.13k  |             move |start, end, mut output| async move { | 
932  | 1.13k  |                 let inner = self.inner.lock().await;  | 
933  | 1.13k  |                 let mut done = true;  | 
934  | 1.13k  |                 let mut new_start = start.as_ref();  | 
935  | 1.13k  |                 let mut new_end = end.as_ref();  | 
936  |  |  | 
937  | 1.13k  |                 let iterator = inner  | 
938  | 1.13k  |                     .get_range_of_actions(state, (start.as_ref(), end.as_ref()))  | 
939  | 1.13k  |                     .map(|res| res543 .err_tip543 (|| "In AwaitedActionDb::get_range_of_actions"));  | 
940  |  |  | 
941  |  |                 // TODO(palfrey) This should probably use the `.left()/right()` pattern,  | 
942  |  |                 // but that doesn't exist in the std or any libraries we use.  | 
943  | 1.13k  |                 if desc {  Branch (943:20): [True: 0, False: 0]
   Branch (943:20): [Folded - Ignored]
   Branch (943:20): [True: 130, False: 1.00k]
  | 
944  | 130  |                     for result43  in iterator.rev() {  | 
945  | 43  |                         let (sorted_awaited_action, item) =  | 
946  | 43  |                             result.err_tip(|| "In AwaitedActionDb::get_range_of_actions")?0 ;  | 
947  | 43  |                         output.push_back(item);  | 
948  | 43  |                         new_end = Bound::Excluded(sorted_awaited_action);  | 
949  | 43  |                         done = false;  | 
950  |  |                     }  | 
951  |  |                 } else { | 
952  | 1.50k  |                     for result500  in iterator {  | 
953  | 500  |                         let (sorted_awaited_action, item) =  | 
954  | 500  |                             result.err_tip(|| "In AwaitedActionDb::get_range_of_actions")?0 ;  | 
955  | 500  |                         output.push_back(item);  | 
956  | 500  |                         new_start = Bound::Excluded(sorted_awaited_action);  | 
957  | 500  |                         done = false;  | 
958  |  |                     }  | 
959  |  |                 }  | 
960  | 1.13k  |                 if done {  Branch (960:20): [True: 0, False: 0]
   Branch (960:20): [Folded - Ignored]
   Branch (960:20): [True: 590, False: 540]
  | 
961  | 590  |                     return Ok(None);  | 
962  | 540  |                 }  | 
963  | 540  |                 Ok(Some(((new_start.cloned(), new_end.cloned()), output)))  | 
964  | 2.26k  |             },  | 
965  |  |         ))  | 
966  | 590  |     }  | 
967  |  |  | 
968  | 39  |     async fn update_awaited_action(&self, new_awaited_action: AwaitedAction) -> Result<(), Error> { | 
969  | 39  |         self.inner  | 
970  | 39  |             .lock()  | 
971  | 39  |             .await  | 
972  | 39  |             .update_awaited_action(new_awaited_action)?0 ;  | 
973  | 39  |         self.tasks_change_notify.notify_one();  | 
974  | 39  |         Ok(())  | 
975  | 39  |     }  | 
976  |  |  | 
977  | 27  |     async fn add_action(  | 
978  | 27  |         &self,  | 
979  | 27  |         client_operation_id: OperationId,  | 
980  | 27  |         action_info: Arc<ActionInfo>,  | 
981  | 27  |         _no_event_action_timeout: Duration,  | 
982  | 27  |     ) -> Result<Self::Subscriber, Error> { | 
983  | 27  |         let subscriber = self  | 
984  | 27  |             .inner  | 
985  | 27  |             .lock()  | 
986  | 27  |             .await  | 
987  | 27  |             .add_action(client_operation_id, action_info)  | 
988  | 27  |             .await?0 ;  | 
989  | 27  |         self.tasks_change_notify.notify_one();  | 
990  | 27  |         Ok(subscriber)  | 
991  | 27  |     }  | 
992  |  | }  |