/build/source/nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::sync::Arc; |
16 | | use std::time::{SystemTime, UNIX_EPOCH}; |
17 | | |
18 | | use nativelink_error::{Error, ResultExt, make_input_err}; |
19 | | use nativelink_metric::{ |
20 | | MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, |
21 | | }; |
22 | | use nativelink_util::action_messages::{ |
23 | | ActionInfo, ActionStage, ActionState, OperationId, WorkerId, |
24 | | }; |
25 | | use nativelink_util::origin_event::OriginMetadata; |
26 | | use opentelemetry::baggage::BaggageExt; |
27 | | use opentelemetry::context::Context; |
28 | | use opentelemetry_semantic_conventions::attribute::ENDUSER_ID; |
29 | | use serde::{Deserialize, Serialize}; |
30 | | use static_assertions::{assert_eq_size, const_assert, const_assert_eq}; |
31 | | |
32 | | /// The version of the awaited action. |
33 | | /// This number will always increment by one each time |
34 | | /// the action is updated. |
35 | | #[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)] |
36 | | struct AwaitedActionVersion(i64); |
37 | | |
38 | | impl MetricsComponent for AwaitedActionVersion { |
39 | 0 | fn publish( |
40 | 0 | &self, |
41 | 0 | _kind: MetricKind, |
42 | 0 | _field_metadata: MetricFieldData, |
43 | 0 | ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { |
44 | 0 | Ok(MetricPublishKnownKindData::Counter(u64::from_ne_bytes( |
45 | 0 | self.0.to_ne_bytes(), |
46 | 0 | ))) |
47 | 0 | } |
48 | | } |
49 | | |
50 | | /// An action that is being awaited on and last known state. |
51 | | #[derive(Debug, Clone, MetricsComponent, Serialize, Deserialize)] |
52 | | pub struct AwaitedAction { |
53 | | /// The current version of the action. |
54 | | #[metric(help = "The version of the AwaitedAction")] |
55 | | version: AwaitedActionVersion, |
56 | | |
57 | | /// The action that is being awaited on. |
58 | | #[metric(help = "The action info of the AwaitedAction")] |
59 | | action_info: Arc<ActionInfo>, |
60 | | |
61 | | /// The operation id of the action. |
62 | | // If you need the client operation id, it may be set in |
63 | | // ActionState::operation_id. |
64 | | #[metric(help = "The operation id of the AwaitedAction")] |
65 | | operation_id: OperationId, |
66 | | |
67 | | /// The currentsort key used to order the actions. |
68 | | #[metric(help = "The sort key of the AwaitedAction")] |
69 | | sort_key: AwaitedActionSortKey, |
70 | | |
71 | | /// The time the action was last updated. |
72 | | #[metric(help = "The last time the worker updated the AwaitedAction")] |
73 | | last_worker_updated_timestamp: SystemTime, |
74 | | |
75 | | /// The last time the client sent a keepalive message. |
76 | | #[metric(help = "The last time the client sent a keepalive message")] |
77 | | last_client_keepalive_timestamp: SystemTime, |
78 | | |
79 | | /// Worker that is currently running this action, None if unassigned. |
80 | | #[metric(help = "The worker id of the AwaitedAction")] |
81 | | worker_id: Option<WorkerId>, |
82 | | |
83 | | /// The current state of the action. |
84 | | #[metric(help = "The state of the AwaitedAction")] |
85 | | state: Arc<ActionState>, |
86 | | |
87 | | /// The origin metadata of the action. |
88 | | maybe_origin_metadata: Option<OriginMetadata>, |
89 | | |
90 | | /// Number of attempts the job has been tried. |
91 | | #[metric(help = "The number of attempts the AwaitedAction has been tried")] |
92 | | pub attempts: usize, |
93 | | } |
94 | | |
95 | | impl AwaitedAction { |
96 | 35 | pub fn new(operation_id: OperationId, action_info: Arc<ActionInfo>, now: SystemTime) -> Self { |
97 | 35 | let sort_key = AwaitedActionSortKey::new_with_unique_key( |
98 | 35 | action_info.priority, |
99 | 35 | &action_info.insert_timestamp, |
100 | | ); |
101 | 35 | let action_state = Arc::new(ActionState { |
102 | 35 | stage: ActionStage::Queued, |
103 | 35 | // Note: We don't use the real client_operation_id here because |
104 | 35 | // the only place AwaitedAction::new should ever be called is |
105 | 35 | // when the action is first created and this struct will be stored |
106 | 35 | // in the database, so we don't want to accidentally leak the |
107 | 35 | // client_operation_id to all clients. |
108 | 35 | client_operation_id: operation_id.clone(), |
109 | 35 | action_digest: action_info.unique_qualifier.digest(), |
110 | 35 | last_transition_timestamp: now, |
111 | 35 | }); |
112 | | |
113 | 35 | let ctx = Context::current(); |
114 | 35 | let baggage = ctx.baggage(); |
115 | | |
116 | 35 | let maybe_origin_metadata = if baggage.is_empty() { Branch (116:40): [True: 31, False: 4]
Branch (116:40): [Folded - Ignored]
|
117 | 31 | None |
118 | | } else { |
119 | | Some(OriginMetadata { |
120 | 4 | identity: baggage |
121 | 4 | .get(ENDUSER_ID) |
122 | 4 | .map(|v| v.as_str().to_string()) |
123 | 4 | .unwrap_or_default(), |
124 | 4 | bazel_metadata: None, // TODO(palfrey): Implement conversion. |
125 | | }) |
126 | | }; |
127 | | |
128 | 35 | Self { |
129 | 35 | version: AwaitedActionVersion(0), |
130 | 35 | action_info, |
131 | 35 | operation_id, |
132 | 35 | sort_key, |
133 | 35 | attempts: 0, |
134 | 35 | last_worker_updated_timestamp: now, |
135 | 35 | last_client_keepalive_timestamp: now, |
136 | 35 | maybe_origin_metadata, |
137 | 35 | worker_id: None, |
138 | 35 | state: action_state, |
139 | 35 | } |
140 | 35 | } |
141 | | |
142 | 94 | pub(crate) const fn version(&self) -> i64 { |
143 | 94 | self.version.0 |
144 | 94 | } |
145 | | |
146 | 36 | pub(crate) const fn set_version(&mut self, version: i64) { |
147 | 36 | self.version = AwaitedActionVersion(version); |
148 | 36 | } |
149 | | |
150 | 39 | pub(crate) const fn increment_version(&mut self) { |
151 | 39 | self.version = AwaitedActionVersion(self.version.0 + 1); |
152 | 39 | } |
153 | | |
154 | 371 | pub const fn action_info(&self) -> &Arc<ActionInfo> { |
155 | 371 | &self.action_info |
156 | 371 | } |
157 | | |
158 | 130 | pub const fn operation_id(&self) -> &OperationId { |
159 | 130 | &self.operation_id |
160 | 130 | } |
161 | | |
162 | 76 | pub(crate) const fn sort_key(&self) -> AwaitedActionSortKey { |
163 | 76 | self.sort_key |
164 | 76 | } |
165 | | |
166 | 1.06k | pub const fn state(&self) -> &Arc<ActionState> { |
167 | 1.06k | &self.state |
168 | 1.06k | } |
169 | | |
170 | 134 | pub(crate) const fn maybe_origin_metadata(&self) -> Option<&OriginMetadata> { |
171 | 134 | self.maybe_origin_metadata.as_ref() |
172 | 134 | } |
173 | | |
174 | 115 | pub(crate) const fn worker_id(&self) -> Option<&WorkerId> { |
175 | 115 | self.worker_id.as_ref() |
176 | 115 | } |
177 | | |
178 | 556 | pub(crate) const fn last_worker_updated_timestamp(&self) -> SystemTime { |
179 | 556 | self.last_worker_updated_timestamp |
180 | 556 | } |
181 | | |
182 | 91 | pub(crate) const fn worker_keep_alive(&mut self, now: SystemTime) { |
183 | 91 | self.last_worker_updated_timestamp = now; |
184 | 91 | } |
185 | | |
186 | 582 | pub(crate) const fn last_client_keepalive_timestamp(&self) -> SystemTime { |
187 | 582 | self.last_client_keepalive_timestamp |
188 | 582 | } |
189 | | |
190 | 60 | pub(crate) const fn update_client_keep_alive(&mut self, now: SystemTime) { |
191 | 60 | self.last_client_keepalive_timestamp = now; |
192 | 60 | } |
193 | | |
194 | 66 | pub(crate) fn set_client_operation_id(&mut self, client_operation_id: OperationId) { |
195 | 66 | Arc::make_mut(&mut self.state).client_operation_id = client_operation_id; |
196 | 66 | } |
197 | | |
198 | | /// Sets the worker id that is currently processing this action. |
199 | 49 | pub(crate) fn set_worker_id(&mut self, new_maybe_worker_id: Option<WorkerId>, now: SystemTime) { |
200 | 49 | if self.worker_id != new_maybe_worker_id { Branch (200:12): [True: 42, False: 7]
Branch (200:12): [Folded - Ignored]
|
201 | 42 | self.worker_id = new_maybe_worker_id; |
202 | 42 | self.worker_keep_alive(now); |
203 | 42 | }7 |
204 | 49 | } |
205 | | |
206 | | /// Sets the current state of the action and updates the last worker updated timestamp. |
207 | 49 | pub fn worker_set_state(&mut self, mut state: Arc<ActionState>, now: SystemTime) { |
208 | 49 | core::mem::swap(&mut self.state, &mut state); |
209 | 49 | self.worker_keep_alive(now); |
210 | 49 | } |
211 | | } |
212 | | |
213 | | impl TryFrom<&[u8]> for AwaitedAction { |
214 | | type Error = Error; |
215 | 0 | fn try_from(value: &[u8]) -> Result<Self, Self::Error> { |
216 | 0 | serde_json::from_slice(value) |
217 | 0 | .map_err(|e| make_input_err!("{}", e.to_string())) |
218 | 0 | .err_tip(|| "In AwaitedAction::TryFrom::&[u8]") |
219 | 0 | } |
220 | | } |
221 | | |
222 | | /// The key used to sort the awaited actions. |
223 | | /// |
224 | | /// The rules for sorting are as follows: |
225 | | /// 1. priority of the action |
226 | | /// 2. insert order of the action (lower = higher priority) |
227 | | /// 3. (mostly random hash based on the action info) |
228 | | #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] |
229 | | #[repr(transparent)] |
230 | | pub struct AwaitedActionSortKey(u64); |
231 | | |
232 | | impl MetricsComponent for AwaitedActionSortKey { |
233 | 0 | fn publish( |
234 | 0 | &self, |
235 | 0 | _kind: MetricKind, |
236 | 0 | _field_metadata: MetricFieldData, |
237 | 0 | ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { |
238 | 0 | Ok(MetricPublishKnownKindData::Counter(self.0)) |
239 | 0 | } |
240 | | } |
241 | | |
242 | | impl AwaitedActionSortKey { |
243 | 35 | const fn new(priority: i32, insert_timestamp: u32) -> Self { |
244 | | // Shift the signed i32 range [i32::MIN, i32::MAX] to the unsigned u32 range |
245 | | // [0, u32::MAX] to preserve ordering when we convert to bytes for sorting. |
246 | 35 | let priority_u32 = i32::MIN.unsigned_abs().wrapping_add_signed(priority); |
247 | 35 | let priority = priority_u32.to_be_bytes(); |
248 | | |
249 | | // Invert our timestamp so the larger the timestamp the lower the number. |
250 | | // This makes timestamp descending order instead of ascending. |
251 | 35 | let timestamp = (insert_timestamp ^ u32::MAX).to_be_bytes(); |
252 | | |
253 | 35 | Self(u64::from_be_bytes([ |
254 | 35 | priority[0], |
255 | 35 | priority[1], |
256 | 35 | priority[2], |
257 | 35 | priority[3], |
258 | 35 | timestamp[0], |
259 | 35 | timestamp[1], |
260 | 35 | timestamp[2], |
261 | 35 | timestamp[3], |
262 | 35 | ])) |
263 | 35 | } |
264 | | |
265 | 35 | fn new_with_unique_key(priority: i32, insert_timestamp: &SystemTime) -> Self { |
266 | 35 | let timestamp = u32::try_from( |
267 | 35 | insert_timestamp |
268 | 35 | .duration_since(UNIX_EPOCH) |
269 | 35 | .unwrap() |
270 | 35 | .as_secs(), |
271 | | ) |
272 | 35 | .unwrap_or(u32::MAX); |
273 | 35 | Self::new(priority, timestamp) |
274 | 35 | } |
275 | | |
276 | 13 | pub(crate) const fn as_u64(self) -> u64 { |
277 | 13 | self.0 |
278 | 13 | } |
279 | | } |
280 | | |
281 | | // Ensure the size of the sort key is the same as a `u64`. |
282 | | assert_eq_size!(AwaitedActionSortKey, u64); |
283 | | |
284 | | const_assert_eq!( |
285 | | AwaitedActionSortKey::new(0x1234_5678, 0x9abc_def0).0, |
286 | | // Note: Result has 0x12345678 + 0x80000000 = 0x92345678 because we need |
287 | | // to shift the `i32::MIN` value to be represented by zero. |
288 | | // Note: `6543210f` are the inverted bits of `9abcdef0`. |
289 | | // This effectively inverts the priority to now have the highest priority |
290 | | // be the lowest timestamps. |
291 | | AwaitedActionSortKey(0x9234_5678_6543_210f).0 |
292 | | ); |
293 | | // Ensure the priority is used as the sort key first. |
294 | | const_assert!( |
295 | | AwaitedActionSortKey::new(i32::MAX, 0).0 > AwaitedActionSortKey::new(i32::MAX - 1, 0).0 |
296 | | ); |
297 | | const_assert!(AwaitedActionSortKey::new(i32::MAX - 1, 0).0 > AwaitedActionSortKey::new(1, 0).0); |
298 | | const_assert!(AwaitedActionSortKey::new(1, 0).0 > AwaitedActionSortKey::new(0, 0).0); |
299 | | const_assert!(AwaitedActionSortKey::new(0, 0).0 > AwaitedActionSortKey::new(-1, 0).0); |
300 | | const_assert!(AwaitedActionSortKey::new(-1, 0).0 > AwaitedActionSortKey::new(i32::MIN + 1, 0).0); |
301 | | const_assert!( |
302 | | AwaitedActionSortKey::new(i32::MIN + 1, 0).0 > AwaitedActionSortKey::new(i32::MIN, 0).0 |
303 | | ); |
304 | | |
305 | | // Ensure the insert timestamp is used as the sort key second. |
306 | | const_assert!(AwaitedActionSortKey::new(0, u32::MIN).0 > AwaitedActionSortKey::new(0, u32::MAX).0); |