/build/source/nativelink-scheduler/src/memory_awaited_action_db.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::ops::{Bound, RangeBounds}; |
16 | | use core::time::Duration; |
17 | | use std::collections::hash_map::Entry; |
18 | | use std::collections::{BTreeMap, BTreeSet, HashMap}; |
19 | | use std::sync::Arc; |
20 | | |
21 | | use async_lock::Mutex; |
22 | | use futures::{FutureExt, Stream}; |
23 | | use nativelink_config::stores::EvictionPolicy; |
24 | | use nativelink_error::{Code, Error, ResultExt, error_if, make_err}; |
25 | | use nativelink_metric::MetricsComponent; |
26 | | use nativelink_util::action_messages::{ |
27 | | ActionInfo, ActionStage, ActionUniqueKey, ActionUniqueQualifier, OperationId, |
28 | | }; |
29 | | use nativelink_util::chunked_stream::ChunkedStream; |
30 | | use nativelink_util::evicting_map::{EvictingMap, LenEntry}; |
31 | | use nativelink_util::instant_wrapper::InstantWrapper; |
32 | | use nativelink_util::metrics::{ |
33 | | EXECUTION_METRICS, ExecutionResult, ExecutionStage, make_execution_attributes, |
34 | | }; |
35 | | use nativelink_util::spawn; |
36 | | use nativelink_util::task::JoinHandleDropGuard; |
37 | | use tokio::sync::{Notify, mpsc, watch}; |
38 | | use tracing::{debug, error}; |
39 | | |
40 | | use crate::awaited_action_db::{ |
41 | | AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, CLIENT_KEEPALIVE_DURATION, |
42 | | SortedAwaitedAction, SortedAwaitedActionState, |
43 | | }; |
44 | | |
45 | | /// Number of events to process per cycle. |
46 | | const MAX_ACTION_EVENTS_RX_PER_CYCLE: usize = 1024; |
47 | | |
48 | | /// Represents a client that is currently listening to an action. |
49 | | /// When the client is dropped, it will send the `AwaitedAction` to the |
50 | | /// `event_tx` if there are other cleanups needed. |
51 | | #[derive(Debug)] |
52 | | struct ClientAwaitedAction { |
53 | | /// The `OperationId` that the client is listening to. |
54 | | operation_id: OperationId, |
55 | | |
56 | | /// The sender to notify of this struct being dropped. |
57 | | event_tx: mpsc::UnboundedSender<ActionEvent>, |
58 | | } |
59 | | |
60 | | impl ClientAwaitedAction { |
61 | 27 | pub(crate) const fn new( |
62 | 27 | operation_id: OperationId, |
63 | 27 | event_tx: mpsc::UnboundedSender<ActionEvent>, |
64 | 27 | ) -> Self { |
65 | 27 | Self { |
66 | 27 | operation_id, |
67 | 27 | event_tx, |
68 | 27 | } |
69 | 27 | } |
70 | | |
71 | 3 | pub(crate) const fn operation_id(&self) -> &OperationId { |
72 | 3 | &self.operation_id |
73 | 3 | } |
74 | | } |
75 | | |
76 | | impl Drop for ClientAwaitedAction { |
77 | 27 | fn drop(&mut self) { |
78 | | // If we failed to send it means noone is listening. |
79 | 27 | drop(self.event_tx.send(ActionEvent::ClientDroppedOperation( |
80 | 27 | self.operation_id.clone(), |
81 | 27 | ))); |
82 | 27 | } |
83 | | } |
84 | | |
85 | | /// Trait to be able to use the `EvictingMap` with `ClientAwaitedAction`. |
86 | | /// Note: We only use `EvictingMap` for a time based eviction, which is |
87 | | /// why the implementation has fixed default values in it. |
88 | | impl LenEntry for ClientAwaitedAction { |
89 | | #[inline] |
90 | 29 | fn len(&self) -> u64 { |
91 | 29 | 0 |
92 | 29 | } |
93 | | |
94 | | #[inline] |
95 | 0 | fn is_empty(&self) -> bool { |
96 | 0 | true |
97 | 0 | } |
98 | | } |
99 | | |
100 | | /// Actions the `AwaitedActionsDb` needs to process. |
101 | | #[derive(Debug)] |
102 | | pub(crate) enum ActionEvent { |
103 | | /// A client has sent a keep alive message. |
104 | | ClientKeepAlive(OperationId), |
105 | | /// A client has dropped and pointed to `OperationId`. |
106 | | ClientDroppedOperation(OperationId), |
107 | | } |
108 | | |
109 | | /// Information required to track an individual client |
110 | | /// keep alive config and state. |
111 | | #[derive(Debug)] |
112 | | struct ClientInfo<I: InstantWrapper, NowFn: Fn() -> I> { |
113 | | /// The client operation id. |
114 | | client_operation_id: OperationId, |
115 | | /// The last time a keep alive was sent. |
116 | | last_keep_alive: I, |
117 | | /// The function to get the current time. |
118 | | now_fn: NowFn, |
119 | | /// The sender to notify of this struct had an event. |
120 | | event_tx: mpsc::UnboundedSender<ActionEvent>, |
121 | | } |
122 | | |
123 | | /// Subscriber that clients can be used to monitor when `AwaitedActions` change. |
124 | | #[derive(Debug)] |
125 | | pub struct MemoryAwaitedActionSubscriber<I: InstantWrapper, NowFn: Fn() -> I> { |
126 | | /// The receiver to listen for changes. |
127 | | awaited_action_rx: watch::Receiver<AwaitedAction>, |
128 | | /// If a client id is known this is the info needed to keep the client |
129 | | /// action alive. |
130 | | client_info: Option<ClientInfo<I, NowFn>>, |
131 | | } |
132 | | |
133 | | impl<I: InstantWrapper, NowFn: Fn() -> I> MemoryAwaitedActionSubscriber<I, NowFn> { |
134 | 583 | fn new(mut awaited_action_rx: watch::Receiver<AwaitedAction>) -> Self { |
135 | 583 | awaited_action_rx.mark_changed(); |
136 | 583 | Self { |
137 | 583 | awaited_action_rx, |
138 | 583 | client_info: None, |
139 | 583 | } |
140 | 583 | } |
141 | | |
142 | 30 | fn new_with_client( |
143 | 30 | mut awaited_action_rx: watch::Receiver<AwaitedAction>, |
144 | 30 | client_operation_id: OperationId, |
145 | 30 | event_tx: mpsc::UnboundedSender<ActionEvent>, |
146 | 30 | now_fn: NowFn, |
147 | 30 | ) -> Self |
148 | 30 | where |
149 | 30 | NowFn: Fn() -> I, |
150 | | { |
151 | 30 | awaited_action_rx.mark_changed(); |
152 | 30 | Self { |
153 | 30 | awaited_action_rx, |
154 | 30 | client_info: Some(ClientInfo { |
155 | 30 | client_operation_id, |
156 | 30 | last_keep_alive: I::from_secs(0), |
157 | 30 | now_fn, |
158 | 30 | event_tx, |
159 | 30 | }), |
160 | 30 | } |
161 | 30 | } |
162 | | } |
163 | | |
164 | | impl<I, NowFn> AwaitedActionSubscriber for MemoryAwaitedActionSubscriber<I, NowFn> |
165 | | where |
166 | | I: InstantWrapper, |
167 | | NowFn: Fn() -> I + Send + Sync + 'static, |
168 | | { |
169 | 58 | async fn changed(&mut self) -> Result<AwaitedAction, Error> { |
170 | 44 | let client_operation_id = { |
171 | 58 | let changed_fut = self.awaited_action_rx.changed().map(|r| {44 |
172 | 44 | r.map_err(|e| {0 |
173 | 0 | make_err!( |
174 | 0 | Code::Internal, |
175 | | "Failed to wait for awaited action to change {e:?}" |
176 | | ) |
177 | 0 | }) |
178 | 44 | }); |
179 | 58 | let Some(client_info) = self.client_info.as_mut() else { Branch (179:17): [True: 0, False: 0]
Branch (179:17): [Folded - Ignored]
Branch (179:17): [True: 58, False: 0]
|
180 | 0 | changed_fut.await?; |
181 | 0 | return Ok(self.awaited_action_rx.borrow().clone()); |
182 | | }; |
183 | 58 | tokio::pin!(changed_fut); |
184 | | loop { |
185 | 164 | if client_info.last_keep_alive.elapsed() > CLIENT_KEEPALIVE_DURATION { Branch (185:20): [True: 0, False: 0]
Branch (185:20): [Folded - Ignored]
Branch (185:20): [True: 55, False: 109]
|
186 | 55 | client_info.last_keep_alive = (client_info.now_fn)(); |
187 | 55 | // Failing to send just means our receiver dropped. |
188 | 55 | drop(client_info.event_tx.send(ActionEvent::ClientKeepAlive( |
189 | 55 | client_info.client_operation_id.clone(), |
190 | 55 | ))); |
191 | 109 | } |
192 | 164 | let sleep_fut = (client_info.now_fn)().sleep(CLIENT_KEEPALIVE_DURATION); |
193 | 164 | tokio::select! { |
194 | 164 | result44 = &mut changed_fut => { |
195 | 44 | result?0 ; |
196 | 44 | break; |
197 | | } |
198 | 164 | () = sleep_fut => { |
199 | 106 | // If we haven't received any updates for a while, we should |
200 | 106 | // let the database know that we are still listening to prevent |
201 | 106 | // the action from being dropped. |
202 | 106 | } |
203 | | } |
204 | | } |
205 | 44 | client_info.client_operation_id.clone() |
206 | | }; |
207 | | // At this stage we know that this event is a client request, so we need |
208 | | // to populate the client_operation_id. |
209 | 44 | let mut awaited_action = self.awaited_action_rx.borrow().clone(); |
210 | 44 | awaited_action.set_client_operation_id(client_operation_id); |
211 | 44 | Ok(awaited_action) |
212 | 44 | } |
213 | | |
214 | 668 | async fn borrow(&self) -> Result<AwaitedAction, Error> { |
215 | 668 | let mut awaited_action = self.awaited_action_rx.borrow().clone(); |
216 | 668 | if let Some(client_info16 ) = self.client_info.as_ref() { Branch (216:16): [True: 0, False: 0]
Branch (216:16): [Folded - Ignored]
Branch (216:16): [True: 0, False: 0]
Branch (216:16): [True: 16, False: 652]
|
217 | 16 | awaited_action.set_client_operation_id(client_info.client_operation_id.clone()); |
218 | 652 | } |
219 | 668 | Ok(awaited_action) |
220 | 668 | } |
221 | | } |
222 | | |
223 | | /// A struct that is used to keep the developer from trying to |
224 | | /// return early from a function. |
225 | | struct NoEarlyReturn; |
226 | | |
227 | | #[derive(Debug, Default, MetricsComponent)] |
228 | | struct SortedAwaitedActions { |
229 | | #[metric(group = "unknown")] |
230 | | unknown: BTreeSet<SortedAwaitedAction>, |
231 | | #[metric(group = "cache_check")] |
232 | | cache_check: BTreeSet<SortedAwaitedAction>, |
233 | | #[metric(group = "queued")] |
234 | | queued: BTreeSet<SortedAwaitedAction>, |
235 | | #[metric(group = "executing")] |
236 | | executing: BTreeSet<SortedAwaitedAction>, |
237 | | #[metric(group = "completed")] |
238 | | completed: BTreeSet<SortedAwaitedAction>, |
239 | | } |
240 | | |
241 | | impl SortedAwaitedActions { |
242 | 39 | const fn btree_for_state(&mut self, state: &ActionStage) -> &mut BTreeSet<SortedAwaitedAction> { |
243 | 39 | match state { |
244 | 0 | ActionStage::Unknown => &mut self.unknown, |
245 | 0 | ActionStage::CacheCheck => &mut self.cache_check, |
246 | 26 | ActionStage::Queued => &mut self.queued, |
247 | 13 | ActionStage::Executing => &mut self.executing, |
248 | 0 | ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => &mut self.completed, |
249 | | } |
250 | 39 | } |
251 | | |
252 | 63 | fn insert_sort_map_for_stage( |
253 | 63 | &mut self, |
254 | 63 | stage: &ActionStage, |
255 | 63 | sorted_awaited_action: &SortedAwaitedAction, |
256 | 63 | ) -> Result<(), Error> { |
257 | 63 | let newly_inserted = match stage { |
258 | 0 | ActionStage::Unknown => self.unknown.insert(sorted_awaited_action.clone()), |
259 | 0 | ActionStage::CacheCheck => self.cache_check.insert(sorted_awaited_action.clone()), |
260 | 31 | ActionStage::Queued => self.queued.insert(sorted_awaited_action.clone()), |
261 | 26 | ActionStage::Executing => self.executing.insert(sorted_awaited_action.clone()), |
262 | 6 | ActionStage::Completed(_) => self.completed.insert(sorted_awaited_action.clone()), |
263 | | ActionStage::CompletedFromCache(_) => { |
264 | 0 | self.completed.insert(sorted_awaited_action.clone()) |
265 | | } |
266 | | }; |
267 | 63 | if !newly_inserted { Branch (267:12): [True: 0, False: 63]
Branch (267:12): [Folded - Ignored]
|
268 | 0 | return Err(make_err!( |
269 | 0 | Code::Internal, |
270 | 0 | "Tried to insert an action that was already in the sorted map. This should never happen. {:?} - {:?}", |
271 | 0 | stage, |
272 | 0 | sorted_awaited_action |
273 | 0 | )); |
274 | 63 | } |
275 | 63 | Ok(()) |
276 | 63 | } |
277 | | |
278 | 39 | fn process_state_changes( |
279 | 39 | &mut self, |
280 | 39 | old_awaited_action: &AwaitedAction, |
281 | 39 | new_awaited_action: &AwaitedAction, |
282 | 39 | ) -> Result<(), Error> { |
283 | 39 | let btree = self.btree_for_state(&old_awaited_action.state().stage); |
284 | 39 | let maybe_sorted_awaited_action = btree.take(&SortedAwaitedAction { |
285 | 39 | sort_key: old_awaited_action.sort_key(), |
286 | 39 | operation_id: new_awaited_action.operation_id().clone(), |
287 | 39 | }); |
288 | | |
289 | 39 | let Some(sorted_awaited_action) = maybe_sorted_awaited_action else { Branch (289:13): [True: 39, False: 0]
Branch (289:13): [Folded - Ignored]
|
290 | 0 | return Err(make_err!( |
291 | 0 | Code::Internal, |
292 | 0 | "sorted_action_info_hash_keys and action_info_hash_key_to_awaited_action are out of sync - {} - {:?}", |
293 | 0 | new_awaited_action.operation_id(), |
294 | 0 | new_awaited_action, |
295 | 0 | )); |
296 | | }; |
297 | | |
298 | 39 | self.insert_sort_map_for_stage(&new_awaited_action.state().stage, &sorted_awaited_action) |
299 | 39 | .err_tip(|| "In AwaitedActionDb::update_awaited_action")?0 ; |
300 | 39 | Ok(()) |
301 | 39 | } |
302 | | } |
303 | | |
304 | | /// The database for storing the state of all actions. |
305 | | #[derive(Debug, MetricsComponent)] |
306 | | pub struct AwaitedActionDbImpl<I: InstantWrapper, NowFn: Fn() -> I> { |
307 | | /// A lookup table to lookup the state of an action by its client operation id. |
308 | | #[metric(group = "client_operation_ids")] |
309 | | client_operation_to_awaited_action: |
310 | | EvictingMap<OperationId, OperationId, Arc<ClientAwaitedAction>, I>, |
311 | | |
312 | | /// A lookup table to lookup the state of an action by its worker operation id. |
313 | | #[metric(group = "operation_ids")] |
314 | | operation_id_to_awaited_action: BTreeMap<OperationId, watch::Sender<AwaitedAction>>, |
315 | | |
316 | | /// A lookup table to lookup the state of an action by its unique qualifier. |
317 | | #[metric(group = "action_info_hash_key_to_awaited_action")] |
318 | | action_info_hash_key_to_awaited_action: HashMap<ActionUniqueKey, OperationId>, |
319 | | |
320 | | /// A sorted set of [`AwaitedAction`]s. A wrapper is used to perform sorting |
321 | | /// based on the [`AwaitedActionSortKey`] of the [`AwaitedAction`]. |
322 | | /// |
323 | | /// See [`AwaitedActionSortKey`] for more information on the ordering. |
324 | | #[metric(group = "sorted_action_infos")] |
325 | | sorted_action_info_hash_keys: SortedAwaitedActions, |
326 | | |
327 | | /// The number of connected clients for each operation id. |
328 | | #[metric(group = "connected_clients_for_operation_id")] |
329 | | connected_clients_for_operation_id: HashMap<OperationId, usize>, |
330 | | |
331 | | /// Where to send notifications about important events related to actions. |
332 | | action_event_tx: mpsc::UnboundedSender<ActionEvent>, |
333 | | |
334 | | /// The function to get the current time. |
335 | | now_fn: NowFn, |
336 | | } |
337 | | |
338 | | impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbImpl<I, NowFn> { |
339 | 3 | async fn get_awaited_action_by_id( |
340 | 3 | &self, |
341 | 3 | client_operation_id: &OperationId, |
342 | 3 | ) -> Result<Option<MemoryAwaitedActionSubscriber<I, NowFn>>, Error> { |
343 | 3 | let maybe_client_awaited_action = self |
344 | 3 | .client_operation_to_awaited_action |
345 | 3 | .get(client_operation_id) |
346 | 3 | .await; |
347 | 3 | let Some(client_awaited_action) = maybe_client_awaited_action else { Branch (347:13): [True: 0, False: 0]
Branch (347:13): [Folded - Ignored]
Branch (347:13): [True: 3, False: 0]
|
348 | 0 | return Ok(None); |
349 | | }; |
350 | | |
351 | 3 | self.operation_id_to_awaited_action |
352 | 3 | .get(client_awaited_action.operation_id()) |
353 | 3 | .map(|tx| { |
354 | 3 | Some(MemoryAwaitedActionSubscriber::new_with_client( |
355 | 3 | tx.subscribe(), |
356 | 3 | client_operation_id.clone(), |
357 | 3 | self.action_event_tx.clone(), |
358 | 3 | self.now_fn.clone(), |
359 | 3 | )) |
360 | 3 | }) |
361 | 3 | .ok_or_else(|| {0 |
362 | 0 | make_err!( |
363 | 0 | Code::Internal, |
364 | | "Failed to get client operation id {client_operation_id:?}" |
365 | | ) |
366 | 0 | }) |
367 | 3 | } |
368 | | |
369 | | /// Processes action events that need to be handled by the database. |
370 | 55 | async fn handle_action_events( |
371 | 55 | &mut self, |
372 | 55 | action_events: impl IntoIterator<Item = ActionEvent>, |
373 | 55 | ) -> NoEarlyReturn { |
374 | 110 | for action55 in action_events { |
375 | 55 | debug!(?action, "Handling action"); |
376 | 55 | match action { |
377 | 1 | ActionEvent::ClientDroppedOperation(operation_id) => { |
378 | | // Cleanup operation_id_to_awaited_action. |
379 | 1 | let Some(tx) = self.operation_id_to_awaited_action.remove(&operation_id) else { Branch (379:25): [True: 0, False: 0]
Branch (379:25): [Folded - Ignored]
Branch (379:25): [True: 0, False: 0]
Branch (379:25): [True: 1, False: 0]
|
380 | 0 | error!( |
381 | | %operation_id, |
382 | 0 | "operation_id_to_awaited_action does not have operation_id" |
383 | | ); |
384 | 0 | continue; |
385 | | }; |
386 | | |
387 | 1 | let connected_clients = match self |
388 | 1 | .connected_clients_for_operation_id |
389 | 1 | .entry(operation_id.clone()) |
390 | | { |
391 | 1 | Entry::Occupied(entry) => { |
392 | 1 | let value = *entry.get(); |
393 | 1 | entry.remove(); |
394 | 1 | value - 1 |
395 | | } |
396 | | Entry::Vacant(_) => { |
397 | 0 | error!( |
398 | | %operation_id, |
399 | 0 | "connected_clients_for_operation_id does not have operation_id" |
400 | | ); |
401 | 0 | 0 |
402 | | } |
403 | | }; |
404 | | |
405 | | // Note: It is rare to have more than one client listening |
406 | | // to the same action, so we assume that we are the last |
407 | | // client and insert it back into the map if we detect that |
408 | | // there are still clients listening (ie: the happy path |
409 | | // is operation.connected_clients == 0). |
410 | 1 | if connected_clients != 0 { Branch (410:24): [True: 0, False: 0]
Branch (410:24): [Folded - Ignored]
Branch (410:24): [True: 0, False: 0]
Branch (410:24): [True: 1, False: 0]
|
411 | 1 | self.operation_id_to_awaited_action |
412 | 1 | .insert(operation_id.clone(), tx); |
413 | 1 | self.connected_clients_for_operation_id |
414 | 1 | .insert(operation_id, connected_clients); |
415 | 1 | continue; |
416 | 0 | } |
417 | 0 | debug!(%operation_id, "Clearing operation from state manager"); |
418 | 0 | let awaited_action = tx.borrow().clone(); |
419 | | // Cleanup action_info_hash_key_to_awaited_action if it was marked cached. |
420 | 0 | match &awaited_action.action_info().unique_qualifier { |
421 | 0 | ActionUniqueQualifier::Cacheable(action_key) => { |
422 | 0 | let maybe_awaited_action = self |
423 | 0 | .action_info_hash_key_to_awaited_action |
424 | 0 | .remove(action_key); |
425 | 0 | if !awaited_action.state().stage.is_finished() Branch (425:32): [True: 0, False: 0]
Branch (425:32): [Folded - Ignored]
Branch (425:32): [True: 0, False: 0]
Branch (425:32): [True: 0, False: 0]
|
426 | 0 | && maybe_awaited_action.is_none() Branch (426:36): [True: 0, False: 0]
Branch (426:36): [Folded - Ignored]
Branch (426:36): [True: 0, False: 0]
Branch (426:36): [True: 0, False: 0]
|
427 | | { |
428 | 0 | error!( |
429 | | %operation_id, |
430 | | ?awaited_action, |
431 | | ?action_key, |
432 | 0 | "action_info_hash_key_to_awaited_action and operation_id_to_awaited_action are out of sync", |
433 | | ); |
434 | 0 | } |
435 | | } |
436 | 0 | ActionUniqueQualifier::Uncacheable(_action_key) => { |
437 | 0 | // This Operation should not be in the hash_key map. |
438 | 0 | } |
439 | | } |
440 | | |
441 | | // Cleanup sorted_awaited_action. |
442 | 0 | let sort_key = awaited_action.sort_key(); |
443 | 0 | let sort_btree_for_state = self |
444 | 0 | .sorted_action_info_hash_keys |
445 | 0 | .btree_for_state(&awaited_action.state().stage); |
446 | | |
447 | 0 | let maybe_sorted_awaited_action = |
448 | 0 | sort_btree_for_state.take(&SortedAwaitedAction { |
449 | 0 | sort_key, |
450 | 0 | operation_id: operation_id.clone(), |
451 | 0 | }); |
452 | 0 | if maybe_sorted_awaited_action.is_none() { Branch (452:24): [True: 0, False: 0]
Branch (452:24): [Folded - Ignored]
Branch (452:24): [True: 0, False: 0]
Branch (452:24): [True: 0, False: 0]
|
453 | 0 | error!( |
454 | | %operation_id, |
455 | | ?sort_key, |
456 | 0 | "Expected maybe_sorted_awaited_action to have {sort_key:?}", |
457 | | ); |
458 | 0 | } |
459 | | } |
460 | 54 | ActionEvent::ClientKeepAlive(client_id) => { |
461 | 54 | if let Some(client_awaited_action) = self Branch (461:28): [True: 0, False: 0]
Branch (461:28): [Folded - Ignored]
Branch (461:28): [True: 0, False: 0]
Branch (461:28): [True: 54, False: 0]
|
462 | 54 | .client_operation_to_awaited_action |
463 | 54 | .get(&client_id) |
464 | 54 | .await |
465 | | { |
466 | 54 | if let Some(awaited_action_sender) = self Branch (466:32): [True: 0, False: 0]
Branch (466:32): [Folded - Ignored]
Branch (466:32): [True: 0, False: 0]
Branch (466:32): [True: 54, False: 0]
|
467 | 54 | .operation_id_to_awaited_action |
468 | 54 | .get(&client_awaited_action.operation_id) |
469 | | { |
470 | 54 | awaited_action_sender.send_if_modified(|awaited_action| { |
471 | 54 | awaited_action.update_client_keep_alive((self.now_fn)().now()); |
472 | 54 | false |
473 | 54 | }); |
474 | 0 | } |
475 | | } else { |
476 | 0 | error!( |
477 | | ?client_id, |
478 | 0 | "client_operation_to_awaited_action does not have client_id", |
479 | | ); |
480 | | } |
481 | | } |
482 | | } |
483 | | } |
484 | 55 | NoEarlyReturn |
485 | 55 | } |
486 | | |
487 | 1 | fn get_awaited_actions_range( |
488 | 1 | &self, |
489 | 1 | start: Bound<&OperationId>, |
490 | 1 | end: Bound<&OperationId>, |
491 | 1 | ) -> impl Iterator<Item = (&'_ OperationId, MemoryAwaitedActionSubscriber<I, NowFn>)> |
492 | 1 | + use<'_, I, NowFn> { |
493 | 1 | self.operation_id_to_awaited_action |
494 | 1 | .range((start, end)) |
495 | 1 | .map(|(operation_id, tx)| {0 |
496 | 0 | ( |
497 | 0 | operation_id, |
498 | 0 | MemoryAwaitedActionSubscriber::<I, NowFn>::new(tx.subscribe()), |
499 | 0 | ) |
500 | 0 | }) |
501 | 1 | } |
502 | | |
503 | 584 | fn get_by_operation_id( |
504 | 584 | &self, |
505 | 584 | operation_id: &OperationId, |
506 | 584 | ) -> Option<MemoryAwaitedActionSubscriber<I, NowFn>> { |
507 | 584 | self.operation_id_to_awaited_action |
508 | 584 | .get(operation_id) |
509 | 584 | .map(|tx| MemoryAwaitedActionSubscriber::<I, NowFn>::new583 (tx583 .subscribe583 ())) |
510 | 584 | } |
511 | | |
512 | 1.13k | fn get_range_of_actions( |
513 | 1.13k | &self, |
514 | 1.13k | state: SortedAwaitedActionState, |
515 | 1.13k | range: impl RangeBounds<SortedAwaitedAction>, |
516 | 1.13k | ) -> impl DoubleEndedIterator< |
517 | 1.13k | Item = Result< |
518 | 1.13k | ( |
519 | 1.13k | &SortedAwaitedAction, |
520 | 1.13k | MemoryAwaitedActionSubscriber<I, NowFn>, |
521 | 1.13k | ), |
522 | 1.13k | Error, |
523 | 1.13k | >, |
524 | 1.13k | > { |
525 | 1.13k | let btree = match state { |
526 | 0 | SortedAwaitedActionState::CacheCheck => &self.sorted_action_info_hash_keys.cache_check, |
527 | 1.13k | SortedAwaitedActionState::Queued => &self.sorted_action_info_hash_keys.queued, |
528 | 0 | SortedAwaitedActionState::Executing => &self.sorted_action_info_hash_keys.executing, |
529 | 0 | SortedAwaitedActionState::Completed => &self.sorted_action_info_hash_keys.completed, |
530 | | }; |
531 | 1.13k | btree.range(range).map(|sorted_awaited_action| {543 |
532 | 543 | let operation_id = &sorted_awaited_action.operation_id; |
533 | 543 | self.get_by_operation_id(operation_id) |
534 | 543 | .ok_or_else(|| {0 |
535 | 0 | make_err!( |
536 | 0 | Code::Internal, |
537 | | "Failed to get operation id {}", |
538 | | operation_id |
539 | | ) |
540 | 0 | }) |
541 | 543 | .map(|subscriber| (sorted_awaited_action, subscriber)) |
542 | 543 | }) |
543 | 1.13k | } |
544 | | |
545 | 39 | fn process_state_changes_for_hash_key_map( |
546 | 39 | action_info_hash_key_to_awaited_action: &mut HashMap<ActionUniqueKey, OperationId>, |
547 | 39 | new_awaited_action: &AwaitedAction, |
548 | 39 | ) { |
549 | | // Only process changes if the stage is not finished. |
550 | 39 | if !new_awaited_action.state().stage.is_finished() { Branch (550:12): [True: 0, False: 0]
Branch (550:12): [Folded - Ignored]
Branch (550:12): [True: 0, False: 0]
Branch (550:12): [True: 33, False: 6]
|
551 | 33 | return; |
552 | 6 | } |
553 | 6 | match &new_awaited_action.action_info().unique_qualifier { |
554 | 6 | ActionUniqueQualifier::Cacheable(action_key) => { |
555 | 6 | let maybe_awaited_action = |
556 | 6 | action_info_hash_key_to_awaited_action.remove(action_key); |
557 | 6 | match maybe_awaited_action { |
558 | 6 | Some(removed_operation_id) => { |
559 | 6 | if &removed_operation_id != new_awaited_action.operation_id() { Branch (559:28): [True: 0, False: 0]
Branch (559:28): [Folded - Ignored]
Branch (559:28): [True: 0, False: 0]
Branch (559:28): [True: 0, False: 6]
|
560 | 0 | error!( |
561 | | ?removed_operation_id, |
562 | | ?new_awaited_action, |
563 | | ?action_key, |
564 | 0 | "action_info_hash_key_to_awaited_action and operation_id_to_awaited_action are out of sync", |
565 | | ); |
566 | 6 | } |
567 | | } |
568 | | None => { |
569 | 0 | error!( |
570 | | ?new_awaited_action, |
571 | | ?action_key, |
572 | 0 | "action_info_hash_key_to_awaited_action out of sync, it should have had the unique_key", |
573 | | ); |
574 | | } |
575 | | } |
576 | | } |
577 | 0 | ActionUniqueQualifier::Uncacheable(_action_key) => { |
578 | 0 | // If we are not cacheable, the action should not be in the |
579 | 0 | // hash_key map, so we don't need to process anything in |
580 | 0 | // action_info_hash_key_to_awaited_action. |
581 | 0 | } |
582 | | } |
583 | 39 | } |
584 | | |
585 | 39 | fn update_awaited_action( |
586 | 39 | &mut self, |
587 | 39 | mut new_awaited_action: AwaitedAction, |
588 | 39 | ) -> Result<(), Error> { |
589 | 39 | let tx = self |
590 | 39 | .operation_id_to_awaited_action |
591 | 39 | .get(new_awaited_action.operation_id()) |
592 | 39 | .ok_or_else(|| {0 |
593 | 0 | make_err!( |
594 | 0 | Code::Internal, |
595 | | "OperationId does not exist in map in AwaitedActionDb::update_awaited_action" |
596 | | ) |
597 | 0 | })?; |
598 | | { |
599 | | // Note: It's important to drop old_awaited_action before we call |
600 | | // send_replace or we will have a deadlock. |
601 | 39 | let old_awaited_action = tx.borrow(); |
602 | | |
603 | | // Do not process changes if the action version is not in sync with |
604 | | // what the sender based the update on. |
605 | 39 | if old_awaited_action.version() != new_awaited_action.version() { Branch (605:16): [True: 0, False: 0]
Branch (605:16): [Folded - Ignored]
Branch (605:16): [True: 0, False: 0]
Branch (605:16): [True: 0, False: 39]
|
606 | 0 | return Err(make_err!( |
607 | 0 | // From: https://grpc.github.io/grpc/core/md_doc_statuscodes.html |
608 | 0 | // Use ABORTED if the client should retry at a higher level |
609 | 0 | // (e.g., when a client-specified test-and-set fails, |
610 | 0 | // indicating the client should restart a read-modify-write |
611 | 0 | // sequence) |
612 | 0 | Code::Aborted, |
613 | 0 | "{} Expected {} but got {} for operation_id {:?} - {:?}", |
614 | 0 | "Tried to update an awaited action with an incorrect version.", |
615 | 0 | old_awaited_action.version(), |
616 | 0 | new_awaited_action.version(), |
617 | 0 | old_awaited_action, |
618 | 0 | new_awaited_action, |
619 | 0 | )); |
620 | 39 | } |
621 | 39 | new_awaited_action.increment_version(); |
622 | | |
623 | 0 | error_if!( |
624 | 39 | old_awaited_action.action_info().unique_qualifier Branch (624:17): [True: 0, False: 0]
Branch (624:17): [Folded - Ignored]
Branch (624:17): [True: 0, False: 0]
Branch (624:17): [True: 0, False: 39]
|
625 | 39 | != new_awaited_action.action_info().unique_qualifier, |
626 | | "Unique key changed for operation_id {:?} - {:?} - {:?}", |
627 | 0 | new_awaited_action.operation_id(), |
628 | 0 | old_awaited_action.action_info(), |
629 | 0 | new_awaited_action.action_info(), |
630 | | ); |
631 | 39 | let is_same_stage = old_awaited_action |
632 | 39 | .state() |
633 | 39 | .stage |
634 | 39 | .is_same_stage(&new_awaited_action.state().stage); |
635 | | |
636 | 39 | if !is_same_stage { Branch (636:16): [True: 0, False: 0]
Branch (636:16): [Folded - Ignored]
Branch (636:16): [True: 0, False: 0]
Branch (636:16): [True: 39, False: 0]
|
637 | | // Record metrics for stage transitions |
638 | 39 | let metrics = &*EXECUTION_METRICS; |
639 | 39 | let old_stage = &old_awaited_action.state().stage; |
640 | 39 | let new_stage = &new_awaited_action.state().stage; |
641 | | |
642 | | // Track stage transitions |
643 | 39 | let base_attrs = make_execution_attributes( |
644 | 39 | "unknown", |
645 | 39 | None, |
646 | 39 | Some(old_awaited_action.action_info().priority), |
647 | | ); |
648 | 39 | metrics.execution_stage_transitions.add(1, &base_attrs); |
649 | | |
650 | | // Update active count for old stage |
651 | 39 | let old_stage_attrs = vec![opentelemetry::KeyValue::new( |
652 | | nativelink_util::metrics::EXECUTION_STAGE, |
653 | 39 | ExecutionStage::from(old_stage), |
654 | | )]; |
655 | 39 | metrics.execution_active_count.add(-1, &old_stage_attrs); |
656 | | |
657 | | // Update active count for new stage |
658 | 39 | let new_stage_attrs = vec![opentelemetry::KeyValue::new( |
659 | | nativelink_util::metrics::EXECUTION_STAGE, |
660 | 39 | ExecutionStage::from(new_stage), |
661 | | )]; |
662 | 39 | metrics.execution_active_count.add(1, &new_stage_attrs); |
663 | | |
664 | | // Record completion metrics with action digest for failure tracking |
665 | 39 | let action_digest = old_awaited_action.action_info().digest().to_string(); |
666 | 39 | if let ActionStage::Completed(action_result6 ) = new_stage { Branch (666:24): [True: 0, False: 0]
Branch (666:24): [Folded - Ignored]
Branch (666:24): [True: 0, False: 0]
Branch (666:24): [True: 6, False: 33]
|
667 | 6 | let result_attrs = vec![ |
668 | 6 | opentelemetry::KeyValue::new( |
669 | | nativelink_util::metrics::EXECUTION_RESULT, |
670 | 6 | if action_result.exit_code == 0 { Branch (670:32): [True: 0, False: 0]
Branch (670:32): [Folded - Ignored]
Branch (670:32): [True: 0, False: 0]
Branch (670:32): [True: 5, False: 1]
|
671 | 5 | ExecutionResult::Success |
672 | | } else { |
673 | 1 | ExecutionResult::Failure |
674 | | }, |
675 | | ), |
676 | 6 | opentelemetry::KeyValue::new( |
677 | | nativelink_util::metrics::EXECUTION_ACTION_DIGEST, |
678 | 6 | action_digest, |
679 | | ), |
680 | | ]; |
681 | 6 | metrics.execution_completed_count.add(1, &result_attrs); |
682 | 33 | } else if let ActionStage::CompletedFromCache(_) = new_stage { Branch (682:31): [True: 0, False: 0]
Branch (682:31): [Folded - Ignored]
Branch (682:31): [True: 0, False: 0]
Branch (682:31): [True: 0, False: 33]
|
683 | 0 | let result_attrs = vec![ |
684 | 0 | opentelemetry::KeyValue::new( |
685 | 0 | nativelink_util::metrics::EXECUTION_RESULT, |
686 | 0 | ExecutionResult::CacheHit, |
687 | 0 | ), |
688 | 0 | opentelemetry::KeyValue::new( |
689 | 0 | nativelink_util::metrics::EXECUTION_ACTION_DIGEST, |
690 | 0 | action_digest, |
691 | 0 | ), |
692 | 0 | ]; |
693 | 0 | metrics.execution_completed_count.add(1, &result_attrs); |
694 | 33 | } |
695 | | |
696 | 39 | self.sorted_action_info_hash_keys |
697 | 39 | .process_state_changes(&old_awaited_action, &new_awaited_action)?0 ; |
698 | 39 | Self::process_state_changes_for_hash_key_map( |
699 | 39 | &mut self.action_info_hash_key_to_awaited_action, |
700 | 39 | &new_awaited_action, |
701 | | ); |
702 | 0 | } |
703 | | } |
704 | | |
705 | | // Notify all listeners of the new state and ignore if no one is listening. |
706 | | // Note: Do not use `.send()` as it will not update the state if all listeners |
707 | | // are dropped. |
708 | 39 | drop(tx.send_replace(new_awaited_action)); |
709 | | |
710 | 39 | Ok(()) |
711 | 39 | } |
712 | | |
713 | | /// Creates a new [`ClientAwaitedAction`] and a [`watch::Receiver`] to |
714 | | /// listen for changes. We don't do this in-line because it is important |
715 | | /// to ALWAYS construct a [`ClientAwaitedAction`] before inserting it into |
716 | | /// the map. Failing to do so may result in memory leaks. This is because |
717 | | /// [`ClientAwaitedAction`] implements a drop function that will trigger |
718 | | /// cleanup of the other maps on drop. |
719 | 24 | fn make_client_awaited_action( |
720 | 24 | &mut self, |
721 | 24 | operation_id: &OperationId, |
722 | 24 | awaited_action: AwaitedAction, |
723 | 24 | ) -> (Arc<ClientAwaitedAction>, watch::Receiver<AwaitedAction>) { |
724 | 24 | let (tx, rx) = watch::channel(awaited_action); |
725 | 24 | let client_awaited_action = Arc::new(ClientAwaitedAction::new( |
726 | 24 | operation_id.clone(), |
727 | 24 | self.action_event_tx.clone(), |
728 | | )); |
729 | 24 | self.operation_id_to_awaited_action |
730 | 24 | .insert(operation_id.clone(), tx); |
731 | 24 | self.connected_clients_for_operation_id |
732 | 24 | .insert(operation_id.clone(), 1); |
733 | 24 | (client_awaited_action, rx) |
734 | 24 | } |
735 | | |
736 | 27 | async fn add_action( |
737 | 27 | &mut self, |
738 | 27 | client_operation_id: OperationId, |
739 | 27 | action_info: Arc<ActionInfo>, |
740 | 27 | ) -> Result<MemoryAwaitedActionSubscriber<I, NowFn>, Error> { |
741 | | // Check to see if the action is already known and subscribe if it is. |
742 | 27 | let subscription_result = self |
743 | 27 | .try_subscribe( |
744 | 27 | &client_operation_id, |
745 | 27 | &action_info.unique_qualifier, |
746 | 27 | action_info.priority, |
747 | 27 | ) |
748 | 27 | .await |
749 | 27 | .err_tip(|| "In AwaitedActionDb::subscribe_or_add_action"); |
750 | 27 | match subscription_result { |
751 | 0 | Err(err) => return Err(err), |
752 | 3 | Ok(Some(subscription)) => return Ok(subscription), |
753 | 24 | Ok(None) => { /* Add item to queue. */ } |
754 | | } |
755 | | |
756 | 24 | let maybe_unique_key = match &action_info.unique_qualifier { |
757 | 24 | ActionUniqueQualifier::Cacheable(unique_key) => Some(unique_key.clone()), |
758 | 0 | ActionUniqueQualifier::Uncacheable(_unique_key) => None, |
759 | | }; |
760 | 24 | let operation_id = OperationId::default(); |
761 | 24 | let awaited_action = AwaitedAction::new( |
762 | 24 | operation_id.clone(), |
763 | 24 | action_info.clone(), |
764 | 24 | (self.now_fn)().now(), |
765 | | ); |
766 | 24 | debug_assert!( |
767 | 0 | ActionStage::Queued == awaited_action.state().stage, |
768 | 0 | "Expected action to be queued" |
769 | | ); |
770 | 24 | let sort_key = awaited_action.sort_key(); |
771 | | |
772 | 24 | let (client_awaited_action, rx) = |
773 | 24 | self.make_client_awaited_action(&operation_id.clone(), awaited_action); |
774 | | |
775 | 24 | debug!( |
776 | | %client_operation_id, |
777 | | %operation_id, |
778 | | ?client_awaited_action, |
779 | 24 | "Adding action" |
780 | | ); |
781 | | |
782 | 24 | self.client_operation_to_awaited_action |
783 | 24 | .insert(client_operation_id.clone(), client_awaited_action) |
784 | 24 | .await; |
785 | | |
786 | | // Note: We only put items in the map that are cacheable. |
787 | 24 | if let Some(unique_key) = maybe_unique_key { Branch (787:16): [True: 0, False: 0]
Branch (787:16): [Folded - Ignored]
Branch (787:16): [True: 24, False: 0]
|
788 | 24 | let old_value = self |
789 | 24 | .action_info_hash_key_to_awaited_action |
790 | 24 | .insert(unique_key, operation_id.clone()); |
791 | 24 | if let Some(old_value0 ) = old_value { Branch (791:20): [True: 0, False: 0]
Branch (791:20): [Folded - Ignored]
Branch (791:20): [True: 0, False: 24]
|
792 | 0 | error!( |
793 | | %operation_id, |
794 | | ?old_value, |
795 | 0 | "action_info_hash_key_to_awaited_action already has unique_key" |
796 | | ); |
797 | 24 | } |
798 | 0 | } |
799 | | |
800 | | // Record metric for new action entering the queue |
801 | 24 | let metrics = &*EXECUTION_METRICS; |
802 | 24 | let _base_attrs = make_execution_attributes("unknown", None, Some(action_info.priority)); |
803 | 24 | let queued_attrs = vec![opentelemetry::KeyValue::new( |
804 | | nativelink_util::metrics::EXECUTION_STAGE, |
805 | 24 | ExecutionStage::Queued, |
806 | | )]; |
807 | 24 | metrics.execution_active_count.add(1, &queued_attrs); |
808 | | |
809 | 24 | self.sorted_action_info_hash_keys |
810 | 24 | .insert_sort_map_for_stage( |
811 | 24 | &ActionStage::Queued, |
812 | 24 | &SortedAwaitedAction { |
813 | 24 | sort_key, |
814 | 24 | operation_id, |
815 | 24 | }, |
816 | | ) |
817 | 24 | .err_tip(|| "In AwaitedActionDb::subscribe_or_add_action")?0 ; |
818 | | |
819 | 24 | Ok(MemoryAwaitedActionSubscriber::new_with_client( |
820 | 24 | rx, |
821 | 24 | client_operation_id, |
822 | 24 | self.action_event_tx.clone(), |
823 | 24 | self.now_fn.clone(), |
824 | 24 | )) |
825 | 27 | } |
826 | | |
827 | 27 | async fn try_subscribe( |
828 | 27 | &mut self, |
829 | 27 | client_operation_id: &OperationId, |
830 | 27 | unique_qualifier: &ActionUniqueQualifier, |
831 | 27 | // TODO(palfrey) To simplify the scheduler 2024 refactor, we |
832 | 27 | // removed the ability to upgrade priorities of actions. |
833 | 27 | // we should add priority upgrades back in. |
834 | 27 | _priority: i32, |
835 | 27 | ) -> Result<Option<MemoryAwaitedActionSubscriber<I, NowFn>>, Error> { |
836 | 27 | let unique_key = match unique_qualifier { |
837 | 27 | ActionUniqueQualifier::Cacheable(unique_key) => unique_key, |
838 | 0 | ActionUniqueQualifier::Uncacheable(_unique_key) => return Ok(None), |
839 | | }; |
840 | | |
841 | 27 | let Some(operation_id3 ) = self.action_info_hash_key_to_awaited_action.get(unique_key) else { Branch (841:13): [True: 0, False: 0]
Branch (841:13): [Folded - Ignored]
Branch (841:13): [True: 3, False: 24]
|
842 | 24 | return Ok(None); // Not currently running. |
843 | | }; |
844 | | |
845 | 3 | let Some(tx) = self.operation_id_to_awaited_action.get(operation_id) else { Branch (845:13): [True: 0, False: 0]
Branch (845:13): [Folded - Ignored]
Branch (845:13): [True: 3, False: 0]
|
846 | 0 | return Err(make_err!( |
847 | 0 | Code::Internal, |
848 | 0 | "operation_id_to_awaited_action and action_info_hash_key_to_awaited_action are out of sync for {unique_key:?} - {operation_id}" |
849 | 0 | )); |
850 | | }; |
851 | | |
852 | 0 | error_if!( |
853 | 3 | tx.borrow().state().stage.is_finished(), Branch (853:13): [True: 0, False: 0]
Branch (853:13): [Folded - Ignored]
Branch (853:13): [True: 0, False: 3]
|
854 | | "Tried to subscribe to a completed action but it already finished. This should never happen. {:?}", |
855 | 0 | tx.borrow() |
856 | | ); |
857 | | |
858 | 3 | let maybe_connected_clients = self |
859 | 3 | .connected_clients_for_operation_id |
860 | 3 | .get_mut(operation_id); |
861 | 3 | let Some(connected_clients) = maybe_connected_clients else { Branch (861:13): [True: 0, False: 0]
Branch (861:13): [Folded - Ignored]
Branch (861:13): [True: 3, False: 0]
|
862 | 0 | return Err(make_err!( |
863 | 0 | Code::Internal, |
864 | 0 | "connected_clients_for_operation_id and operation_id_to_awaited_action are out of sync for {unique_key:?} - {operation_id}" |
865 | 0 | )); |
866 | | }; |
867 | 3 | *connected_clients += 1; |
868 | | |
869 | | // Immediately mark the keep alive, we don't need to wake anyone |
870 | | // so we always fake that it was not actually changed. |
871 | | // Failing update the client could lead to the client connecting |
872 | | // then not updating the keep alive in time, resulting in the |
873 | | // operation timing out due to async behavior. |
874 | 3 | tx.send_if_modified(|awaited_action| { |
875 | 3 | awaited_action.update_client_keep_alive((self.now_fn)().now()); |
876 | 3 | false |
877 | 3 | }); |
878 | 3 | let subscription = tx.subscribe(); |
879 | | |
880 | 3 | self.client_operation_to_awaited_action |
881 | 3 | .insert( |
882 | 3 | client_operation_id.clone(), |
883 | 3 | Arc::new(ClientAwaitedAction::new( |
884 | 3 | operation_id.clone(), |
885 | 3 | self.action_event_tx.clone(), |
886 | 3 | )), |
887 | 3 | ) |
888 | 3 | .await; |
889 | | |
890 | 3 | Ok(Some(MemoryAwaitedActionSubscriber::new_with_client( |
891 | 3 | subscription, |
892 | 3 | client_operation_id.clone(), |
893 | 3 | self.action_event_tx.clone(), |
894 | 3 | self.now_fn.clone(), |
895 | 3 | ))) |
896 | 27 | } |
897 | | } |
898 | | |
899 | | #[derive(Debug, MetricsComponent)] |
900 | | pub struct MemoryAwaitedActionDb<I: InstantWrapper, NowFn: Fn() -> I> { |
901 | | #[metric] |
902 | | inner: Arc<Mutex<AwaitedActionDbImpl<I, NowFn>>>, |
903 | | tasks_change_notify: Arc<Notify>, |
904 | | _handle_awaited_action_events: JoinHandleDropGuard<()>, |
905 | | } |
906 | | |
907 | | impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static> |
908 | | MemoryAwaitedActionDb<I, NowFn> |
909 | | { |
910 | 22 | pub fn new( |
911 | 22 | eviction_config: &EvictionPolicy, |
912 | 22 | tasks_change_notify: Arc<Notify>, |
913 | 22 | now_fn: NowFn, |
914 | 22 | ) -> Self { |
915 | 22 | let (action_event_tx, mut action_event_rx) = mpsc::unbounded_channel(); |
916 | 22 | let inner = Arc::new(Mutex::new(AwaitedActionDbImpl { |
917 | 22 | client_operation_to_awaited_action: EvictingMap::new(eviction_config, (now_fn)()), |
918 | 22 | operation_id_to_awaited_action: BTreeMap::new(), |
919 | 22 | action_info_hash_key_to_awaited_action: HashMap::new(), |
920 | 22 | sorted_action_info_hash_keys: SortedAwaitedActions::default(), |
921 | 22 | connected_clients_for_operation_id: HashMap::new(), |
922 | 22 | action_event_tx, |
923 | 22 | now_fn, |
924 | 22 | })); |
925 | 22 | let weak_inner = Arc::downgrade(&inner); |
926 | | Self { |
927 | 22 | inner, |
928 | 22 | tasks_change_notify, |
929 | 22 | _handle_awaited_action_events: spawn!("handle_awaited_action_events", async move {20 |
930 | 20 | let mut dropped_operation_ids = Vec::with_capacity(MAX_ACTION_EVENTS_RX_PER_CYCLE); |
931 | | loop { |
932 | 75 | dropped_operation_ids.clear(); |
933 | 75 | action_event_rx |
934 | 75 | .recv_many(&mut dropped_operation_ids, MAX_ACTION_EVENTS_RX_PER_CYCLE) |
935 | 75 | .await; |
936 | 55 | let Some(inner) = weak_inner.upgrade() else { Branch (936:25): [True: 0, False: 0]
Branch (936:25): [Folded - Ignored]
Branch (936:25): [True: 0, False: 0]
Branch (936:25): [True: 55, False: 0]
|
937 | 0 | return; // Nothing to cleanup, our struct is dropped. |
938 | | }; |
939 | 55 | let mut inner = inner.lock().await; |
940 | 55 | inner |
941 | 55 | .handle_action_events(dropped_operation_ids.drain(..)) |
942 | 55 | .await; |
943 | | } |
944 | 0 | }), |
945 | | } |
946 | 22 | } |
947 | | } |
948 | | |
949 | | impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static> AwaitedActionDb |
950 | | for MemoryAwaitedActionDb<I, NowFn> |
951 | | { |
952 | | type Subscriber = MemoryAwaitedActionSubscriber<I, NowFn>; |
953 | | |
954 | 3 | async fn get_awaited_action_by_id( |
955 | 3 | &self, |
956 | 3 | client_operation_id: &OperationId, |
957 | 3 | ) -> Result<Option<Self::Subscriber>, Error> { |
958 | 3 | self.inner |
959 | 3 | .lock() |
960 | 3 | .await |
961 | 3 | .get_awaited_action_by_id(client_operation_id) |
962 | 3 | .await |
963 | 3 | } |
964 | | |
965 | 1 | async fn get_all_awaited_actions( |
966 | 1 | &self, |
967 | 1 | ) -> Result<impl Stream<Item = Result<Self::Subscriber, Error>>, Error> { |
968 | 1 | Ok(ChunkedStream::new( |
969 | 1 | Bound::Unbounded, |
970 | 1 | Bound::Unbounded, |
971 | 1 | move |start, end, mut output| async move { |
972 | 1 | let inner = self.inner.lock().await; |
973 | 1 | let mut maybe_new_start = None; |
974 | | |
975 | 0 | for (operation_id, item) in |
976 | 1 | inner.get_awaited_actions_range(start.as_ref(), end.as_ref()) |
977 | 0 | { |
978 | 0 | output.push_back(item); |
979 | 0 | maybe_new_start = Some(operation_id); |
980 | 0 | } |
981 | | |
982 | 1 | Ok(maybe_new_start |
983 | 1 | .map(|new_start| ((Bound::Excluded(new_start.clone()), end)0 , output0 ))) |
984 | 2 | }, |
985 | | )) |
986 | 1 | } |
987 | | |
988 | 41 | async fn get_by_operation_id( |
989 | 41 | &self, |
990 | 41 | operation_id: &OperationId, |
991 | 41 | ) -> Result<Option<Self::Subscriber>, Error> { |
992 | 41 | Ok(self.inner.lock().await.get_by_operation_id(operation_id)) |
993 | 41 | } |
994 | | |
995 | 590 | async fn get_range_of_actions( |
996 | 590 | &self, |
997 | 590 | state: SortedAwaitedActionState, |
998 | 590 | start: Bound<SortedAwaitedAction>, |
999 | 590 | end: Bound<SortedAwaitedAction>, |
1000 | 590 | desc: bool, |
1001 | 590 | ) -> Result<impl Stream<Item = Result<Self::Subscriber, Error>> + Send, Error> { |
1002 | 590 | Ok(ChunkedStream::new( |
1003 | 590 | start, |
1004 | 590 | end, |
1005 | 1.13k | move |start, end, mut output| async move { |
1006 | 1.13k | let inner = self.inner.lock().await; |
1007 | 1.13k | let mut done = true; |
1008 | 1.13k | let mut new_start = start.as_ref(); |
1009 | 1.13k | let mut new_end = end.as_ref(); |
1010 | | |
1011 | 1.13k | let iterator = inner |
1012 | 1.13k | .get_range_of_actions(state, (start.as_ref(), end.as_ref())) |
1013 | 1.13k | .map(|res| res543 .err_tip543 (|| "In AwaitedActionDb::get_range_of_actions")); |
1014 | | |
1015 | | // TODO(palfrey) This should probably use the `.left()/right()` pattern, |
1016 | | // but that doesn't exist in the std or any libraries we use. |
1017 | 1.13k | if desc { Branch (1017:20): [True: 0, False: 0]
Branch (1017:20): [Folded - Ignored]
Branch (1017:20): [True: 130, False: 1.00k]
|
1018 | 130 | for result43 in iterator.rev() { |
1019 | 43 | let (sorted_awaited_action, item) = |
1020 | 43 | result.err_tip(|| "In AwaitedActionDb::get_range_of_actions")?0 ; |
1021 | 43 | output.push_back(item); |
1022 | 43 | new_end = Bound::Excluded(sorted_awaited_action); |
1023 | 43 | done = false; |
1024 | | } |
1025 | | } else { |
1026 | 1.50k | for result500 in iterator { |
1027 | 500 | let (sorted_awaited_action, item) = |
1028 | 500 | result.err_tip(|| "In AwaitedActionDb::get_range_of_actions")?0 ; |
1029 | 500 | output.push_back(item); |
1030 | 500 | new_start = Bound::Excluded(sorted_awaited_action); |
1031 | 500 | done = false; |
1032 | | } |
1033 | | } |
1034 | 1.13k | if done { Branch (1034:20): [True: 0, False: 0]
Branch (1034:20): [Folded - Ignored]
Branch (1034:20): [True: 590, False: 540]
|
1035 | 590 | return Ok(None); |
1036 | 540 | } |
1037 | 540 | Ok(Some(((new_start.cloned(), new_end.cloned()), output))) |
1038 | 2.26k | }, |
1039 | | )) |
1040 | 590 | } |
1041 | | |
1042 | 39 | async fn update_awaited_action(&self, new_awaited_action: AwaitedAction) -> Result<(), Error> { |
1043 | 39 | self.inner |
1044 | 39 | .lock() |
1045 | 39 | .await |
1046 | 39 | .update_awaited_action(new_awaited_action)?0 ; |
1047 | 39 | self.tasks_change_notify.notify_one(); |
1048 | 39 | Ok(()) |
1049 | 39 | } |
1050 | | |
1051 | 27 | async fn add_action( |
1052 | 27 | &self, |
1053 | 27 | client_operation_id: OperationId, |
1054 | 27 | action_info: Arc<ActionInfo>, |
1055 | 27 | _no_event_action_timeout: Duration, |
1056 | 27 | ) -> Result<Self::Subscriber, Error> { |
1057 | 27 | let subscriber = self |
1058 | 27 | .inner |
1059 | 27 | .lock() |
1060 | 27 | .await |
1061 | 27 | .add_action(client_operation_id, action_info) |
1062 | 27 | .await?0 ; |
1063 | 27 | self.tasks_change_notify.notify_one(); |
1064 | 27 | Ok(subscriber) |
1065 | 27 | } |
1066 | | } |