Coverage Report

Created: 2025-03-08 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
use std::time::{SystemTime, UNIX_EPOCH};
17
18
use nativelink_error::{make_input_err, Error, ResultExt};
19
use nativelink_metric::{
20
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
21
};
22
use nativelink_util::action_messages::{
23
    ActionInfo, ActionStage, ActionState, OperationId, WorkerId,
24
};
25
use nativelink_util::origin_context::ActiveOriginContext;
26
use nativelink_util::origin_event::{OriginMetadata, ORIGIN_EVENT_COLLECTOR};
27
use serde::{Deserialize, Serialize};
28
use static_assertions::{assert_eq_size, const_assert, const_assert_eq};
29
30
/// The version of the awaited action.
31
/// This number will always increment by one each time
32
/// the action is updated.
33
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
34
struct AwaitedActionVersion(u64);
35
36
impl MetricsComponent for AwaitedActionVersion {
37
0
    fn publish(
38
0
        &self,
39
0
        _kind: MetricKind,
40
0
        _field_metadata: MetricFieldData,
41
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
42
0
        Ok(MetricPublishKnownKindData::Counter(self.0))
43
0
    }
44
}
45
46
/// An action that is being awaited on and last known state.
47
#[derive(Debug, Clone, MetricsComponent, Serialize, Deserialize)]
48
pub struct AwaitedAction {
49
    /// The current version of the action.
50
    #[metric(help = "The version of the AwaitedAction")]
51
    version: AwaitedActionVersion,
52
53
    /// The action that is being awaited on.
54
    #[metric(help = "The action info of the AwaitedAction")]
55
    action_info: Arc<ActionInfo>,
56
57
    /// The operation id of the action.
58
    // If you need the client operation id, it may be set in
59
    // ActionState::operation_id.
60
    #[metric(help = "The operation id of the AwaitedAction")]
61
    operation_id: OperationId,
62
63
    /// The currentsort key used to order the actions.
64
    #[metric(help = "The sort key of the AwaitedAction")]
65
    sort_key: AwaitedActionSortKey,
66
67
    /// The time the action was last updated.
68
    #[metric(help = "The last time the worker updated the AwaitedAction")]
69
    last_worker_updated_timestamp: SystemTime,
70
71
    /// The last time the client sent a keepalive message.
72
    #[metric(help = "The last time the client sent a keepalive message")]
73
    last_client_keepalive_timestamp: SystemTime,
74
75
    /// Worker that is currently running this action, None if unassigned.
76
    #[metric(help = "The worker id of the AwaitedAction")]
77
    worker_id: Option<WorkerId>,
78
79
    /// The current state of the action.
80
    #[metric(help = "The state of the AwaitedAction")]
81
    state: Arc<ActionState>,
82
83
    /// The origin metadata of the action.
84
    maybe_origin_metadata: Option<OriginMetadata>,
85
86
    /// Number of attempts the job has been tried.
87
    #[metric(help = "The number of attempts the AwaitedAction has been tried")]
88
    pub attempts: usize,
89
}
90
91
impl AwaitedAction {
92
34
    pub fn new(operation_id: OperationId, action_info: Arc<ActionInfo>, now: SystemTime) -> Self {
93
34
        let stage = ActionStage::Queued;
94
34
        let sort_key = AwaitedActionSortKey::new_with_unique_key(
95
34
            action_info.priority,
96
34
            &action_info.insert_timestamp,
97
34
        );
98
34
        let state = Arc::new(ActionState {
99
34
            stage,
100
34
            // Note: We don't use the real client_operation_id here because
101
34
            // the only place AwaitedAction::new should ever be called is
102
34
            // when the action is first created and this struct will be stored
103
34
            // in the database, so we don't want to accidentally leak the
104
34
            // client_operation_id to all clients.
105
34
            client_operation_id: operation_id.clone(),
106
34
            action_digest: action_info.unique_qualifier.digest(),
107
34
        });
108
34
        let maybe_origin_metadata = ActiveOriginContext::get_value(&ORIGIN_EVENT_COLLECTOR)
109
34
            .ok()
110
34
            .flatten()
111
34
            .map(|v| 
v.metadata.clone()0
);
112
34
113
34
        Self {
114
34
            version: AwaitedActionVersion(0),
115
34
            action_info,
116
34
            operation_id,
117
34
            sort_key,
118
34
            attempts: 0,
119
34
            last_worker_updated_timestamp: now,
120
34
            last_client_keepalive_timestamp: now,
121
34
            maybe_origin_metadata,
122
34
            worker_id: None,
123
34
            state,
124
34
        }
125
34
    }
126
127
80
    pub(crate) const fn version(&self) -> u64 {
128
80
        self.version.0
129
80
    }
130
131
2
    pub(crate) fn set_version(&mut self, version: u64) {
132
2
        self.version = AwaitedActionVersion(version);
133
2
    }
134
135
39
    pub(crate) fn increment_version(&mut self) {
136
39
        self.version = AwaitedActionVersion(self.version.0 + 1);
137
39
    }
138
139
173
    pub const fn action_info(&self) -> &Arc<ActionInfo> {
140
173
        &self.action_info
141
173
    }
142
143
91
    pub const fn operation_id(&self) -> &OperationId {
144
91
        &self.operation_id
145
91
    }
146
147
65
    pub(crate) const fn sort_key(&self) -> AwaitedActionSortKey {
148
65
        self.sort_key
149
65
    }
150
151
877
    pub const fn state(&self) -> &Arc<ActionState> {
152
877
        &self.state
153
877
    }
154
155
120
    pub(crate) const fn maybe_origin_metadata(&self) -> Option<&OriginMetadata> {
156
120
        self.maybe_origin_metadata.as_ref()
157
120
    }
158
159
53
    pub(crate) const fn worker_id(&self) -> Option<&WorkerId> {
160
53
        self.worker_id.as_ref()
161
53
    }
162
163
549
    pub(crate) const fn last_worker_updated_timestamp(&self) -> SystemTime {
164
549
        self.last_worker_updated_timestamp
165
549
    }
166
167
77
    pub(crate) fn worker_keep_alive(&mut self, now: SystemTime) {
168
77
        self.last_worker_updated_timestamp = now;
169
77
    }
170
171
550
    pub(crate) const fn last_client_keepalive_timestamp(&self) -> SystemTime {
172
550
        self.last_client_keepalive_timestamp
173
550
    }
174
175
57
    pub(crate) fn update_client_keep_alive(&mut self, now: SystemTime) {
176
57
        self.last_client_keepalive_timestamp = now;
177
57
    }
178
179
62
    pub(crate) fn set_client_operation_id(&mut self, client_operation_id: OperationId) {
180
62
        Arc::make_mut(&mut self.state).client_operation_id = client_operation_id;
181
62
    }
182
183
    /// Sets the worker id that is currently processing this action.
184
41
    pub(crate) fn set_worker_id(&mut self, new_maybe_worker_id: Option<WorkerId>, now: SystemTime) {
185
41
        if self.worker_id != new_maybe_worker_id {
  Branch (185:12): [True: 35, False: 6]
  Branch (185:12): [Folded - Ignored]
186
35
            self.worker_id = new_maybe_worker_id;
187
35
            self.worker_keep_alive(now);
188
35
        
}6
189
41
    }
190
191
    /// Sets the current state of the action and updates the last worker updated timestamp.
192
42
    pub fn worker_set_state(&mut self, mut state: Arc<ActionState>, now: SystemTime) {
193
42
        std::mem::swap(&mut self.state, &mut state);
194
42
        self.worker_keep_alive(now);
195
42
    }
196
}
197
198
impl TryFrom<&[u8]> for AwaitedAction {
199
    type Error = Error;
200
0
    fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
201
0
        serde_json::from_slice(value)
202
0
            .map_err(|e| make_input_err!("{}", e.to_string()))
203
0
            .err_tip(|| "In AwaitedAction::TryFrom::&[u8]")
204
0
    }
205
}
206
207
/// The key used to sort the awaited actions.
208
///
209
/// The rules for sorting are as follows:
210
/// 1. priority of the action
211
/// 2. insert order of the action (lower = higher priority)
212
/// 3. (mostly random hash based on the action info)
213
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
214
#[repr(transparent)]
215
pub struct AwaitedActionSortKey(u64);
216
217
impl MetricsComponent for AwaitedActionSortKey {
218
0
    fn publish(
219
0
        &self,
220
0
        _kind: MetricKind,
221
0
        _field_metadata: MetricFieldData,
222
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
223
0
        Ok(MetricPublishKnownKindData::Counter(self.0))
224
0
    }
225
}
226
227
impl AwaitedActionSortKey {
228
    #[rustfmt::skip]
229
34
    const fn new(priority: i32, insert_timestamp: u32) -> Self {
230
        // Shift `new_priority` so [`i32::MIN`] is represented by zero.
231
        // This makes it so any nagative values are positive, but
232
        // maintains ordering.
233
        const MIN_I32: i64 = (i32::MIN as i64).abs();
234
34
        let priority = ((priority as i64 + MIN_I32) as u32).to_be_bytes();
235
34
236
34
        // Invert our timestamp so the larger the timestamp the lower the number.
237
34
        // This makes timestamp descending order instead of ascending.
238
34
        let timestamp = (insert_timestamp ^ u32::MAX).to_be_bytes();
239
34
240
34
        AwaitedActionSortKey(u64::from_be_bytes([
241
34
            priority[0], priority[1], priority[2], priority[3],
242
34
            timestamp[0], timestamp[1], timestamp[2], timestamp[3],
243
34
        ]))
244
34
    }
245
246
34
    fn new_with_unique_key(priority: i32, insert_timestamp: &SystemTime) -> Self {
247
34
        let timestamp = insert_timestamp
248
34
            .duration_since(UNIX_EPOCH)
249
34
            .unwrap()
250
34
            .as_secs() as u32;
251
34
        Self::new(priority, timestamp)
252
34
    }
253
254
2
    pub(crate) const fn as_u64(self) -> u64 {
255
2
        self.0
256
2
    }
257
}
258
259
// Ensure the size of the sort key is the same as a `u64`.
260
assert_eq_size!(AwaitedActionSortKey, u64);
261
262
const_assert_eq!(
263
    AwaitedActionSortKey::new(0x1234_5678, 0x9abc_def0).0,
264
    // Note: Result has 0x12345678 + 0x80000000 = 0x92345678 because we need
265
    // to shift the `i32::MIN` value to be represented by zero.
266
    // Note: `6543210f` are the inverted bits of `9abcdef0`.
267
    // This effectively inverts the priority to now have the highest priority
268
    // be the lowest timestamps.
269
    AwaitedActionSortKey(0x9234_5678_6543_210f).0
270
);
271
// Ensure the priority is used as the sort key first.
272
const_assert!(
273
    AwaitedActionSortKey::new(i32::MAX, 0).0 > AwaitedActionSortKey::new(i32::MAX - 1, 0).0
274
);
275
const_assert!(AwaitedActionSortKey::new(i32::MAX - 1, 0).0 > AwaitedActionSortKey::new(1, 0).0);
276
const_assert!(AwaitedActionSortKey::new(1, 0).0 > AwaitedActionSortKey::new(0, 0).0);
277
const_assert!(AwaitedActionSortKey::new(0, 0).0 > AwaitedActionSortKey::new(-1, 0).0);
278
const_assert!(AwaitedActionSortKey::new(-1, 0).0 > AwaitedActionSortKey::new(i32::MIN + 1, 0).0);
279
const_assert!(
280
    AwaitedActionSortKey::new(i32::MIN + 1, 0).0 > AwaitedActionSortKey::new(i32::MIN, 0).0
281
);
282
283
// Ensure the insert timestamp is used as the sort key second.
284
const_assert!(AwaitedActionSortKey::new(0, u32::MIN).0 > AwaitedActionSortKey::new(0, u32::MAX).0);