/build/source/nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Line  | Count  | Source  | 
1  |  | // Copyright 2024 The NativeLink Authors. All rights reserved.  | 
2  |  | //  | 
3  |  | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");  | 
4  |  | // you may not use this file except in compliance with the License.  | 
5  |  | // You may obtain a copy of the License at  | 
6  |  | //  | 
7  |  | //    See LICENSE file for details  | 
8  |  | //  | 
9  |  | // Unless required by applicable law or agreed to in writing, software  | 
10  |  | // distributed under the License is distributed on an "AS IS" BASIS,  | 
11  |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
12  |  | // See the License for the specific language governing permissions and  | 
13  |  | // limitations under the License.  | 
14  |  |  | 
15  |  | use std::sync::Arc;  | 
16  |  | use std::time::{SystemTime, UNIX_EPOCH}; | 
17  |  |  | 
18  |  | use nativelink_error::{Error, ResultExt, make_input_err}; | 
19  |  | use nativelink_metric::{ | 
20  |  |     MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,  | 
21  |  | };  | 
22  |  | use nativelink_util::action_messages::{ | 
23  |  |     ActionInfo, ActionStage, ActionState, OperationId, WorkerId,  | 
24  |  | };  | 
25  |  | use nativelink_util::origin_event::OriginMetadata;  | 
26  |  | use opentelemetry::baggage::BaggageExt;  | 
27  |  | use opentelemetry::context::Context;  | 
28  |  | use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;  | 
29  |  | use serde::{Deserialize, Serialize}; | 
30  |  | use static_assertions::{assert_eq_size, const_assert, const_assert_eq}; | 
31  |  |  | 
32  |  | /// The version of the awaited action.  | 
33  |  | /// This number will always increment by one each time  | 
34  |  | /// the action is updated.  | 
35  |  | #[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]  | 
36  |  | struct AwaitedActionVersion(i64);  | 
37  |  |  | 
38  |  | impl MetricsComponent for AwaitedActionVersion { | 
39  | 0  |     fn publish(  | 
40  | 0  |         &self,  | 
41  | 0  |         _kind: MetricKind,  | 
42  | 0  |         _field_metadata: MetricFieldData,  | 
43  | 0  |     ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { | 
44  | 0  |         Ok(MetricPublishKnownKindData::Counter(u64::from_ne_bytes(  | 
45  | 0  |             self.0.to_ne_bytes(),  | 
46  | 0  |         )))  | 
47  | 0  |     }  | 
48  |  | }  | 
49  |  |  | 
50  |  | /// An action that is being awaited on and last known state.  | 
51  |  | #[derive(Debug, Clone, MetricsComponent, Serialize, Deserialize)]  | 
52  |  | pub struct AwaitedAction { | 
53  |  |     /// The current version of the action.  | 
54  |  |     #[metric(help = "The version of the AwaitedAction")]  | 
55  |  |     version: AwaitedActionVersion,  | 
56  |  |  | 
57  |  |     /// The action that is being awaited on.  | 
58  |  |     #[metric(help = "The action info of the AwaitedAction")]  | 
59  |  |     action_info: Arc<ActionInfo>,  | 
60  |  |  | 
61  |  |     /// The operation id of the action.  | 
62  |  |     // If you need the client operation id, it may be set in  | 
63  |  |     // ActionState::operation_id.  | 
64  |  |     #[metric(help = "The operation id of the AwaitedAction")]  | 
65  |  |     operation_id: OperationId,  | 
66  |  |  | 
67  |  |     /// The currentsort key used to order the actions.  | 
68  |  |     #[metric(help = "The sort key of the AwaitedAction")]  | 
69  |  |     sort_key: AwaitedActionSortKey,  | 
70  |  |  | 
71  |  |     /// The time the action was last updated.  | 
72  |  |     #[metric(help = "The last time the worker updated the AwaitedAction")]  | 
73  |  |     last_worker_updated_timestamp: SystemTime,  | 
74  |  |  | 
75  |  |     /// The last time the client sent a keepalive message.  | 
76  |  |     #[metric(help = "The last time the client sent a keepalive message")]  | 
77  |  |     last_client_keepalive_timestamp: SystemTime,  | 
78  |  |  | 
79  |  |     /// Worker that is currently running this action, None if unassigned.  | 
80  |  |     #[metric(help = "The worker id of the AwaitedAction")]  | 
81  |  |     worker_id: Option<WorkerId>,  | 
82  |  |  | 
83  |  |     /// The current state of the action.  | 
84  |  |     #[metric(help = "The state of the AwaitedAction")]  | 
85  |  |     state: Arc<ActionState>,  | 
86  |  |  | 
87  |  |     /// The origin metadata of the action.  | 
88  |  |     maybe_origin_metadata: Option<OriginMetadata>,  | 
89  |  |  | 
90  |  |     /// Number of attempts the job has been tried.  | 
91  |  |     #[metric(help = "The number of attempts the AwaitedAction has been tried")]  | 
92  |  |     pub attempts: usize,  | 
93  |  | }  | 
94  |  |  | 
95  |  | impl AwaitedAction { | 
96  | 37  |     pub fn new(operation_id: OperationId, action_info: Arc<ActionInfo>, now: SystemTime) -> Self { | 
97  | 37  |         let sort_key = AwaitedActionSortKey::new_with_unique_key(  | 
98  | 37  |             action_info.priority,  | 
99  | 37  |             &action_info.insert_timestamp,  | 
100  |  |         );  | 
101  | 37  |         let action_state = Arc::new(ActionState { | 
102  | 37  |             stage: ActionStage::Queued,  | 
103  | 37  |             // Note: We don't use the real client_operation_id here because  | 
104  | 37  |             // the only place AwaitedAction::new should ever be called is  | 
105  | 37  |             // when the action is first created and this struct will be stored  | 
106  | 37  |             // in the database, so we don't want to accidentally leak the  | 
107  | 37  |             // client_operation_id to all clients.  | 
108  | 37  |             client_operation_id: operation_id.clone(),  | 
109  | 37  |             action_digest: action_info.unique_qualifier.digest(),  | 
110  | 37  |         });  | 
111  |  |  | 
112  | 37  |         let ctx = Context::current();  | 
113  | 37  |         let baggage = ctx.baggage();  | 
114  |  |  | 
115  | 37  |         let maybe_origin_metadata = if baggage.is_empty() {  Branch (115:40): [True: 33, False: 4]
   Branch (115:40): [Folded - Ignored]
  | 
116  | 33  |             None  | 
117  |  |         } else { | 
118  |  |             Some(OriginMetadata { | 
119  | 4  |                 identity: baggage  | 
120  | 4  |                     .get(ENDUSER_ID)  | 
121  | 4  |                     .map(|v| v.as_str().to_string())  | 
122  | 4  |                     .unwrap_or_default(),  | 
123  | 4  |                 bazel_metadata: None, // TODO(palfrey): Implement conversion.  | 
124  |  |             })  | 
125  |  |         };  | 
126  |  |  | 
127  | 37  |         Self { | 
128  | 37  |             version: AwaitedActionVersion(0),  | 
129  | 37  |             action_info,  | 
130  | 37  |             operation_id,  | 
131  | 37  |             sort_key,  | 
132  | 37  |             attempts: 0,  | 
133  | 37  |             last_worker_updated_timestamp: now,  | 
134  | 37  |             last_client_keepalive_timestamp: now,  | 
135  | 37  |             maybe_origin_metadata,  | 
136  | 37  |             worker_id: None,  | 
137  | 37  |             state: action_state,  | 
138  | 37  |         }  | 
139  | 37  |     }  | 
140  |  |  | 
141  | 97  |     pub(crate) const fn version(&self) -> i64 { | 
142  | 97  |         self.version.0  | 
143  | 97  |     }  | 
144  |  |  | 
145  | 41  |     pub(crate) const fn set_version(&mut self, version: i64) { | 
146  | 41  |         self.version = AwaitedActionVersion(version);  | 
147  | 41  |     }  | 
148  |  |  | 
149  | 39  |     pub(crate) const fn increment_version(&mut self) { | 
150  | 39  |         self.version = AwaitedActionVersion(self.version.0 + 1);  | 
151  | 39  |     }  | 
152  |  |  | 
153  | 202  |     pub const fn action_info(&self) -> &Arc<ActionInfo> { | 
154  | 202  |         &self.action_info  | 
155  | 202  |     }  | 
156  |  |  | 
157  | 136  |     pub const fn operation_id(&self) -> &OperationId { | 
158  | 136  |         &self.operation_id  | 
159  | 136  |     }  | 
160  |  |  | 
161  | 78  |     pub(crate) const fn sort_key(&self) -> AwaitedActionSortKey { | 
162  | 78  |         self.sort_key  | 
163  | 78  |     }  | 
164  |  |  | 
165  | 938  |     pub const fn state(&self) -> &Arc<ActionState> { | 
166  | 938  |         &self.state  | 
167  | 938  |     }  | 
168  |  |  | 
169  | 134  |     pub(crate) const fn maybe_origin_metadata(&self) -> Option<&OriginMetadata> { | 
170  | 134  |         self.maybe_origin_metadata.as_ref()  | 
171  | 134  |     }  | 
172  |  |  | 
173  | 65  |     pub(crate) const fn worker_id(&self) -> Option<&WorkerId> { | 
174  | 65  |         self.worker_id.as_ref()  | 
175  | 65  |     }  | 
176  |  |  | 
177  | 555  |     pub(crate) const fn last_worker_updated_timestamp(&self) -> SystemTime { | 
178  | 555  |         self.last_worker_updated_timestamp  | 
179  | 555  |     }  | 
180  |  |  | 
181  | 92  |     pub(crate) const fn worker_keep_alive(&mut self, now: SystemTime) { | 
182  | 92  |         self.last_worker_updated_timestamp = now;  | 
183  | 92  |     }  | 
184  |  |  | 
185  | 586  |     pub(crate) const fn last_client_keepalive_timestamp(&self) -> SystemTime { | 
186  | 586  |         self.last_client_keepalive_timestamp  | 
187  | 586  |     }  | 
188  |  |  | 
189  | 61  |     pub(crate) const fn update_client_keep_alive(&mut self, now: SystemTime) { | 
190  | 61  |         self.last_client_keepalive_timestamp = now;  | 
191  | 61  |     }  | 
192  |  |  | 
193  | 70  |     pub(crate) fn set_client_operation_id(&mut self, client_operation_id: OperationId) { | 
194  | 70  |         Arc::make_mut(&mut self.state).client_operation_id = client_operation_id;  | 
195  | 70  |     }  | 
196  |  |  | 
197  |  |     /// Sets the worker id that is currently processing this action.  | 
198  | 49  |     pub(crate) fn set_worker_id(&mut self, new_maybe_worker_id: Option<WorkerId>, now: SystemTime) { | 
199  | 49  |         if self.worker_id != new_maybe_worker_id {  Branch (199:12): [True: 42, False: 7]
   Branch (199:12): [Folded - Ignored]
  | 
200  | 42  |             self.worker_id = new_maybe_worker_id;  | 
201  | 42  |             self.worker_keep_alive(now);  | 
202  | 42  |         }7   | 
203  | 49  |     }  | 
204  |  |  | 
205  |  |     /// Sets the current state of the action and updates the last worker updated timestamp.  | 
206  | 50  |     pub fn worker_set_state(&mut self, mut state: Arc<ActionState>, now: SystemTime) { | 
207  | 50  |         core::mem::swap(&mut self.state, &mut state);  | 
208  | 50  |         self.worker_keep_alive(now);  | 
209  | 50  |     }  | 
210  |  | }  | 
211  |  |  | 
212  |  | impl TryFrom<&[u8]> for AwaitedAction { | 
213  |  |     type Error = Error;  | 
214  | 0  |     fn try_from(value: &[u8]) -> Result<Self, Self::Error> { | 
215  | 0  |         serde_json::from_slice(value)  | 
216  | 0  |             .map_err(|e| make_input_err!("{}", e.to_string())) | 
217  | 0  |             .err_tip(|| "In AwaitedAction::TryFrom::&[u8]")  | 
218  | 0  |     }  | 
219  |  | }  | 
220  |  |  | 
221  |  | /// The key used to sort the awaited actions.  | 
222  |  | ///  | 
223  |  | /// The rules for sorting are as follows:  | 
224  |  | /// 1. priority of the action  | 
225  |  | /// 2. insert order of the action (lower = higher priority)  | 
226  |  | /// 3. (mostly random hash based on the action info)  | 
227  |  | #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]  | 
228  |  | #[repr(transparent)]  | 
229  |  | pub struct AwaitedActionSortKey(u64);  | 
230  |  |  | 
231  |  | impl MetricsComponent for AwaitedActionSortKey { | 
232  | 0  |     fn publish(  | 
233  | 0  |         &self,  | 
234  | 0  |         _kind: MetricKind,  | 
235  | 0  |         _field_metadata: MetricFieldData,  | 
236  | 0  |     ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { | 
237  | 0  |         Ok(MetricPublishKnownKindData::Counter(self.0))  | 
238  | 0  |     }  | 
239  |  | }  | 
240  |  |  | 
241  |  | impl AwaitedActionSortKey { | 
242  | 37  |     const fn new(priority: i32, insert_timestamp: u32) -> Self { | 
243  |  |         // Shift the signed i32 range [i32::MIN, i32::MAX] to the unsigned u32 range  | 
244  |  |         // [0, u32::MAX] to preserve ordering when we convert to bytes for sorting.  | 
245  | 37  |         let priority_u32 = i32::MIN.unsigned_abs().wrapping_add_signed(priority);  | 
246  | 37  |         let priority = priority_u32.to_be_bytes();  | 
247  |  |  | 
248  |  |         // Invert our timestamp so the larger the timestamp the lower the number.  | 
249  |  |         // This makes timestamp descending order instead of ascending.  | 
250  | 37  |         let timestamp = (insert_timestamp ^ u32::MAX).to_be_bytes();  | 
251  |  |  | 
252  | 37  |         Self(u64::from_be_bytes([  | 
253  | 37  |             priority[0],  | 
254  | 37  |             priority[1],  | 
255  | 37  |             priority[2],  | 
256  | 37  |             priority[3],  | 
257  | 37  |             timestamp[0],  | 
258  | 37  |             timestamp[1],  | 
259  | 37  |             timestamp[2],  | 
260  | 37  |             timestamp[3],  | 
261  | 37  |         ]))  | 
262  | 37  |     }  | 
263  |  |  | 
264  | 37  |     fn new_with_unique_key(priority: i32, insert_timestamp: &SystemTime) -> Self { | 
265  | 37  |         let timestamp = u32::try_from(  | 
266  | 37  |             insert_timestamp  | 
267  | 37  |                 .duration_since(UNIX_EPOCH)  | 
268  | 37  |                 .unwrap()  | 
269  | 37  |                 .as_secs(),  | 
270  |  |         )  | 
271  | 37  |         .unwrap_or(u32::MAX);  | 
272  | 37  |         Self::new(priority, timestamp)  | 
273  | 37  |     }  | 
274  |  |  | 
275  | 15  |     pub(crate) const fn as_u64(self) -> u64 { | 
276  | 15  |         self.0  | 
277  | 15  |     }  | 
278  |  | }  | 
279  |  |  | 
280  |  | // Ensure the size of the sort key is the same as a `u64`.  | 
281  |  | assert_eq_size!(AwaitedActionSortKey, u64);  | 
282  |  |  | 
283  |  | const_assert_eq!(  | 
284  |  |     AwaitedActionSortKey::new(0x1234_5678, 0x9abc_def0).0,  | 
285  |  |     // Note: Result has 0x12345678 + 0x80000000 = 0x92345678 because we need  | 
286  |  |     // to shift the `i32::MIN` value to be represented by zero.  | 
287  |  |     // Note: `6543210f` are the inverted bits of `9abcdef0`.  | 
288  |  |     // This effectively inverts the priority to now have the highest priority  | 
289  |  |     // be the lowest timestamps.  | 
290  |  |     AwaitedActionSortKey(0x9234_5678_6543_210f).0  | 
291  |  | );  | 
292  |  | // Ensure the priority is used as the sort key first.  | 
293  |  | const_assert!(  | 
294  |  |     AwaitedActionSortKey::new(i32::MAX, 0).0 > AwaitedActionSortKey::new(i32::MAX - 1, 0).0  | 
295  |  | );  | 
296  |  | const_assert!(AwaitedActionSortKey::new(i32::MAX - 1, 0).0 > AwaitedActionSortKey::new(1, 0).0);  | 
297  |  | const_assert!(AwaitedActionSortKey::new(1, 0).0 > AwaitedActionSortKey::new(0, 0).0);  | 
298  |  | const_assert!(AwaitedActionSortKey::new(0, 0).0 > AwaitedActionSortKey::new(-1, 0).0);  | 
299  |  | const_assert!(AwaitedActionSortKey::new(-1, 0).0 > AwaitedActionSortKey::new(i32::MIN + 1, 0).0);  | 
300  |  | const_assert!(  | 
301  |  |     AwaitedActionSortKey::new(i32::MIN + 1, 0).0 > AwaitedActionSortKey::new(i32::MIN, 0).0  | 
302  |  | );  | 
303  |  |  | 
304  |  | // Ensure the insert timestamp is used as the sort key second.  | 
305  |  | const_assert!(AwaitedActionSortKey::new(0, u32::MIN).0 > AwaitedActionSortKey::new(0, u32::MAX).0);  |