/build/source/nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::sync::Arc; |
16 | | use std::time::{SystemTime, UNIX_EPOCH}; |
17 | | |
18 | | use nativelink_error::{make_input_err, Error, ResultExt}; |
19 | | use nativelink_metric::{ |
20 | | MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, |
21 | | }; |
22 | | use nativelink_util::action_messages::{ |
23 | | ActionInfo, ActionStage, ActionState, OperationId, WorkerId, |
24 | | }; |
25 | | use serde::{Deserialize, Serialize}; |
26 | | use static_assertions::{assert_eq_size, const_assert, const_assert_eq}; |
27 | | |
28 | | /// The version of the awaited action. |
29 | | /// This number will always increment by one each time |
30 | | /// the action is updated. |
31 | 2 | #[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)] |
32 | | struct AwaitedActionVersion(u64); |
33 | | |
34 | | impl MetricsComponent for AwaitedActionVersion { |
35 | 0 | fn publish( |
36 | 0 | &self, |
37 | 0 | _kind: MetricKind, |
38 | 0 | _field_metadata: MetricFieldData, |
39 | 0 | ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { |
40 | 0 | Ok(MetricPublishKnownKindData::Counter(self.0)) |
41 | 0 | } |
42 | | } |
43 | | |
44 | | /// An action that is being awaited on and last known state. |
45 | 18 | #[derive(Debug, Clone, M0 etricsComponent0 , Serialize, Deserialize)] |
46 | | pub struct AwaitedAction { |
47 | | /// The current version of the action. |
48 | | #[metric(help = "The version of the AwaitedAction")] |
49 | | version: AwaitedActionVersion, |
50 | | |
51 | | /// The action that is being awaited on. |
52 | | #[metric(help = "The action info of the AwaitedAction")] |
53 | | action_info: Arc<ActionInfo>, |
54 | | |
55 | | /// The operation id of the action. |
56 | | // If you need the client operation id, it may be set in |
57 | | // ActionState::operation_id. |
58 | | #[metric(help = "The operation id of the AwaitedAction")] |
59 | | operation_id: OperationId, |
60 | | |
61 | | /// The currentsort key used to order the actions. |
62 | | #[metric(help = "The sort key of the AwaitedAction")] |
63 | | sort_key: AwaitedActionSortKey, |
64 | | |
65 | | /// The time the action was last updated. |
66 | | #[metric(help = "The last time the worker updated the AwaitedAction")] |
67 | | last_worker_updated_timestamp: SystemTime, |
68 | | |
69 | | /// Worker that is currently running this action, None if unassigned. |
70 | | #[metric(help = "The worker id of the AwaitedAction")] |
71 | | worker_id: Option<WorkerId>, |
72 | | |
73 | | /// The current state of the action. |
74 | | #[metric(help = "The state of the AwaitedAction")] |
75 | | state: Arc<ActionState>, |
76 | | |
77 | | /// Number of attempts the job has been tried. |
78 | | #[metric(help = "The number of attempts the AwaitedAction has been tried")] |
79 | | pub attempts: usize, |
80 | | } |
81 | | |
82 | | impl AwaitedAction { |
83 | 33 | pub fn new(operation_id: OperationId, action_info: Arc<ActionInfo>, now: SystemTime) -> Self { |
84 | 33 | let stage = ActionStage::Queued; |
85 | 33 | let sort_key = AwaitedActionSortKey::new_with_unique_key( |
86 | 33 | action_info.priority, |
87 | 33 | &action_info.insert_timestamp, |
88 | 33 | ); |
89 | 33 | let state = Arc::new(ActionState { |
90 | 33 | stage, |
91 | 33 | // Note: We don't use the real client_operation_id here because |
92 | 33 | // the only place AwaitedAction::new should ever be called is |
93 | 33 | // when the action is first created and this struct will be stored |
94 | 33 | // in the database, so we don't want to accidentally leak the |
95 | 33 | // client_operation_id to all clients. |
96 | 33 | client_operation_id: operation_id.clone(), |
97 | 33 | action_digest: action_info.unique_qualifier.digest(), |
98 | 33 | }); |
99 | 33 | Self { |
100 | 33 | version: AwaitedActionVersion(0), |
101 | 33 | action_info, |
102 | 33 | operation_id, |
103 | 33 | sort_key, |
104 | 33 | attempts: 0, |
105 | 33 | last_worker_updated_timestamp: now, |
106 | 33 | worker_id: None, |
107 | 33 | state, |
108 | 33 | } |
109 | 33 | } |
110 | | |
111 | 80 | pub(crate) fn version(&self) -> u64 { |
112 | 80 | self.version.0 |
113 | 80 | } |
114 | | |
115 | 2 | pub(crate) fn set_version(&mut self, version: u64) { |
116 | 2 | self.version = AwaitedActionVersion(version); |
117 | 2 | } |
118 | | |
119 | 39 | pub(crate) fn increment_version(&mut self) { |
120 | 39 | self.version = AwaitedActionVersion(self.version.0 + 1); |
121 | 39 | } |
122 | | |
123 | 170 | pub fn action_info(&self) -> &Arc<ActionInfo> { |
124 | 170 | &self.action_info |
125 | 170 | } |
126 | | |
127 | 91 | pub fn operation_id(&self) -> &OperationId { |
128 | 91 | &self.operation_id |
129 | 91 | } |
130 | | |
131 | 64 | pub(crate) fn sort_key(&self) -> AwaitedActionSortKey { |
132 | 64 | self.sort_key |
133 | 64 | } |
134 | | |
135 | 430 | pub fn state(&self) -> &Arc<ActionState> { |
136 | 430 | &self.state |
137 | 430 | } |
138 | | |
139 | 53 | pub(crate) fn worker_id(&self) -> Option<WorkerId> { |
140 | 53 | self.worker_id |
141 | 53 | } |
142 | | |
143 | 46 | pub(crate) fn last_worker_updated_timestamp(&self) -> SystemTime { |
144 | 46 | self.last_worker_updated_timestamp |
145 | 46 | } |
146 | | |
147 | 79 | pub(crate) fn keep_alive(&mut self, now: SystemTime) { |
148 | 79 | self.last_worker_updated_timestamp = now; |
149 | 79 | } |
150 | | |
151 | | /// Sets the worker id that is currently processing this action. |
152 | 41 | pub(crate) fn set_worker_id(&mut self, new_maybe_worker_id: Option<WorkerId>, now: SystemTime) { |
153 | 41 | if self.worker_id != new_maybe_worker_id { Branch (153:12): [True: 35, False: 6]
Branch (153:12): [Folded - Ignored]
|
154 | 35 | self.worker_id = new_maybe_worker_id; |
155 | 35 | self.keep_alive(now); |
156 | 35 | }6 |
157 | 41 | } |
158 | | |
159 | | /// Sets the current state of the action and notifies subscribers. |
160 | | /// Returns true if the state was set, false if there are no subscribers. |
161 | 102 | pub fn set_state(&mut self, mut state: Arc<ActionState>, now: Option<SystemTime>) { |
162 | 102 | std::mem::swap(&mut self.state, &mut state); |
163 | 102 | if let Some(now44 ) = now { Branch (163:16): [True: 44, False: 58]
Branch (163:16): [Folded - Ignored]
|
164 | 44 | self.keep_alive(now); |
165 | 58 | } |
166 | 102 | } |
167 | | } |
168 | | |
169 | | impl TryFrom<&[u8]> for AwaitedAction { |
170 | | type Error = Error; |
171 | 0 | fn try_from(value: &[u8]) -> Result<Self, Self::Error> { |
172 | 0 | serde_json::from_slice(value) |
173 | 0 | .map_err(|e| make_input_err!("{}", e.to_string())) |
174 | 0 | .err_tip(|| "In AwaitedAction::TryFrom::&[u8]") |
175 | 0 | } |
176 | | } |
177 | | |
178 | | /// The key used to sort the awaited actions. |
179 | | /// |
180 | | /// The rules for sorting are as follows: |
181 | | /// 1. priority of the action |
182 | | /// 2. insert order of the action (lower = higher priority) |
183 | | /// 3. (mostly random hash based on the action info) |
184 | 2 | #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] |
185 | | #[repr(transparent)] |
186 | | pub struct AwaitedActionSortKey(u64); |
187 | | |
188 | | impl MetricsComponent for AwaitedActionSortKey { |
189 | 0 | fn publish( |
190 | 0 | &self, |
191 | 0 | _kind: MetricKind, |
192 | 0 | _field_metadata: MetricFieldData, |
193 | 0 | ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { |
194 | 0 | Ok(MetricPublishKnownKindData::Counter(self.0)) |
195 | 0 | } |
196 | | } |
197 | | |
198 | | impl AwaitedActionSortKey { |
199 | | #[rustfmt::skip] |
200 | 33 | const fn new(priority: i32, insert_timestamp: u32) -> Self { |
201 | | // Shift `new_priority` so [`i32::MIN`] is represented by zero. |
202 | | // This makes it so any nagative values are positive, but |
203 | | // maintains ordering. |
204 | | const MIN_I32: i64 = (i32::MIN as i64).abs(); |
205 | 33 | let priority = ((priority as i64 + MIN_I32) as u32).to_be_bytes(); |
206 | 33 | |
207 | 33 | // Invert our timestamp so the larger the timestamp the lower the number. |
208 | 33 | // This makes timestamp descending order instead of ascending. |
209 | 33 | let timestamp = (insert_timestamp ^ u32::MAX).to_be_bytes(); |
210 | 33 | |
211 | 33 | AwaitedActionSortKey(u64::from_be_bytes([ |
212 | 33 | priority[0], priority[1], priority[2], priority[3], |
213 | 33 | timestamp[0], timestamp[1], timestamp[2], timestamp[3], |
214 | 33 | ])) |
215 | 33 | } |
216 | | |
217 | 33 | fn new_with_unique_key(priority: i32, insert_timestamp: &SystemTime) -> Self { |
218 | 33 | let timestamp = insert_timestamp |
219 | 33 | .duration_since(UNIX_EPOCH) |
220 | 33 | .unwrap() |
221 | 33 | .as_secs() as u32; |
222 | 33 | Self::new(priority, timestamp) |
223 | 33 | } |
224 | | |
225 | 2 | pub(crate) fn as_u64(self) -> u64 { |
226 | 2 | self.0 |
227 | 2 | } |
228 | | } |
229 | | |
230 | | // Ensure the size of the sort key is the same as a `u64`. |
231 | | assert_eq_size!(AwaitedActionSortKey, u64); |
232 | | |
233 | | const_assert_eq!( |
234 | | AwaitedActionSortKey::new(0x1234_5678, 0x9abc_def0).0, |
235 | | // Note: Result has 0x12345678 + 0x80000000 = 0x92345678 because we need |
236 | | // to shift the `i32::MIN` value to be represented by zero. |
237 | | // Note: `6543210f` are the inverted bits of `9abcdef0`. |
238 | | // This effectively inverts the priority to now have the highest priority |
239 | | // be the lowest timestamps. |
240 | | AwaitedActionSortKey(0x9234_5678_6543_210f).0 |
241 | | ); |
242 | | // Ensure the priority is used as the sort key first. |
243 | | const_assert!( |
244 | | AwaitedActionSortKey::new(i32::MAX, 0).0 > AwaitedActionSortKey::new(i32::MAX - 1, 0).0 |
245 | | ); |
246 | | const_assert!(AwaitedActionSortKey::new(i32::MAX - 1, 0).0 > AwaitedActionSortKey::new(1, 0).0); |
247 | | const_assert!(AwaitedActionSortKey::new(1, 0).0 > AwaitedActionSortKey::new(0, 0).0); |
248 | | const_assert!(AwaitedActionSortKey::new(0, 0).0 > AwaitedActionSortKey::new(-1, 0).0); |
249 | | const_assert!(AwaitedActionSortKey::new(-1, 0).0 > AwaitedActionSortKey::new(i32::MIN + 1, 0).0); |
250 | | const_assert!( |
251 | | AwaitedActionSortKey::new(i32::MIN + 1, 0).0 > AwaitedActionSortKey::new(i32::MIN, 0).0 |
252 | | ); |
253 | | |
254 | | // Ensure the insert timestamp is used as the sort key second. |
255 | | const_assert!(AwaitedActionSortKey::new(0, u32::MIN).0 > AwaitedActionSortKey::new(0, u32::MAX).0); |