Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
use std::time::{SystemTime, UNIX_EPOCH};
17
18
use nativelink_error::{make_input_err, Error, ResultExt};
19
use nativelink_metric::{
20
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
21
};
22
use nativelink_util::action_messages::{
23
    ActionInfo, ActionStage, ActionState, OperationId, WorkerId,
24
};
25
use serde::{Deserialize, Serialize};
26
use static_assertions::{assert_eq_size, const_assert, const_assert_eq};
27
28
/// The version of the awaited action.
29
/// This number will always increment by one each time
30
/// the action is updated.
31
2
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
32
struct AwaitedActionVersion(u64);
33
34
impl MetricsComponent for AwaitedActionVersion {
35
0
    fn publish(
36
0
        &self,
37
0
        _kind: MetricKind,
38
0
        _field_metadata: MetricFieldData,
39
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
40
0
        Ok(MetricPublishKnownKindData::Counter(self.0))
41
0
    }
42
}
43
44
/// An action that is being awaited on and last known state.
45
18
#[derive(Debug, Clone, 
M0
etricsComponen
t0
, Serialize, Deserialize)]
46
pub struct AwaitedAction {
47
    /// The current version of the action.
48
    #[metric(help = "The version of the AwaitedAction")]
49
    version: AwaitedActionVersion,
50
51
    /// The action that is being awaited on.
52
    #[metric(help = "The action info of the AwaitedAction")]
53
    action_info: Arc<ActionInfo>,
54
55
    /// The operation id of the action.
56
    // If you need the client operation id, it may be set in
57
    // ActionState::operation_id.
58
    #[metric(help = "The operation id of the AwaitedAction")]
59
    operation_id: OperationId,
60
61
    /// The currentsort key used to order the actions.
62
    #[metric(help = "The sort key of the AwaitedAction")]
63
    sort_key: AwaitedActionSortKey,
64
65
    /// The time the action was last updated.
66
    #[metric(help = "The last time the worker updated the AwaitedAction")]
67
    last_worker_updated_timestamp: SystemTime,
68
69
    /// Worker that is currently running this action, None if unassigned.
70
    #[metric(help = "The worker id of the AwaitedAction")]
71
    worker_id: Option<WorkerId>,
72
73
    /// The current state of the action.
74
    #[metric(help = "The state of the AwaitedAction")]
75
    state: Arc<ActionState>,
76
77
    /// Number of attempts the job has been tried.
78
    #[metric(help = "The number of attempts the AwaitedAction has been tried")]
79
    pub attempts: usize,
80
}
81
82
impl AwaitedAction {
83
33
    pub fn new(operation_id: OperationId, action_info: Arc<ActionInfo>, now: SystemTime) -> Self {
84
33
        let stage = ActionStage::Queued;
85
33
        let sort_key = AwaitedActionSortKey::new_with_unique_key(
86
33
            action_info.priority,
87
33
            &action_info.insert_timestamp,
88
33
        );
89
33
        let state = Arc::new(ActionState {
90
33
            stage,
91
33
            // Note: We don't use the real client_operation_id here because
92
33
            // the only place AwaitedAction::new should ever be called is
93
33
            // when the action is first created and this struct will be stored
94
33
            // in the database, so we don't want to accidentally leak the
95
33
            // client_operation_id to all clients.
96
33
            client_operation_id: operation_id.clone(),
97
33
            action_digest: action_info.unique_qualifier.digest(),
98
33
        });
99
33
        Self {
100
33
            version: AwaitedActionVersion(0),
101
33
            action_info,
102
33
            operation_id,
103
33
            sort_key,
104
33
            attempts: 0,
105
33
            last_worker_updated_timestamp: now,
106
33
            worker_id: None,
107
33
            state,
108
33
        }
109
33
    }
110
111
80
    pub(crate) fn version(&self) -> u64 {
112
80
        self.version.0
113
80
    }
114
115
2
    pub(crate) fn set_version(&mut self, version: u64) {
116
2
        self.version = AwaitedActionVersion(version);
117
2
    }
118
119
39
    pub(crate) fn increment_version(&mut self) {
120
39
        self.version = AwaitedActionVersion(self.version.0 + 1);
121
39
    }
122
123
170
    pub fn action_info(&self) -> &Arc<ActionInfo> {
124
170
        &self.action_info
125
170
    }
126
127
91
    pub fn operation_id(&self) -> &OperationId {
128
91
        &self.operation_id
129
91
    }
130
131
64
    pub(crate) fn sort_key(&self) -> AwaitedActionSortKey {
132
64
        self.sort_key
133
64
    }
134
135
430
    pub fn state(&self) -> &Arc<ActionState> {
136
430
        &self.state
137
430
    }
138
139
53
    pub(crate) fn worker_id(&self) -> Option<WorkerId> {
140
53
        self.worker_id
141
53
    }
142
143
46
    pub(crate) fn last_worker_updated_timestamp(&self) -> SystemTime {
144
46
        self.last_worker_updated_timestamp
145
46
    }
146
147
79
    pub(crate) fn keep_alive(&mut self, now: SystemTime) {
148
79
        self.last_worker_updated_timestamp = now;
149
79
    }
150
151
    /// Sets the worker id that is currently processing this action.
152
41
    pub(crate) fn set_worker_id(&mut self, new_maybe_worker_id: Option<WorkerId>, now: SystemTime) {
153
41
        if self.worker_id != new_maybe_worker_id {
  Branch (153:12): [True: 35, False: 6]
  Branch (153:12): [Folded - Ignored]
154
35
            self.worker_id = new_maybe_worker_id;
155
35
            self.keep_alive(now);
156
35
        }
6
157
41
    }
158
159
    /// Sets the current state of the action and notifies subscribers.
160
    /// Returns true if the state was set, false if there are no subscribers.
161
102
    pub fn set_state(&mut self, mut state: Arc<ActionState>, now: Option<SystemTime>) {
162
102
        std::mem::swap(&mut self.state, &mut state);
163
102
        if let Some(
now44
) = now {
  Branch (163:16): [True: 44, False: 58]
  Branch (163:16): [Folded - Ignored]
164
44
            self.keep_alive(now);
165
58
        }
166
102
    }
167
}
168
169
impl TryFrom<&[u8]> for AwaitedAction {
170
    type Error = Error;
171
0
    fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
172
0
        serde_json::from_slice(value)
173
0
            .map_err(|e| make_input_err!("{}", e.to_string()))
174
0
            .err_tip(|| "In AwaitedAction::TryFrom::&[u8]")
175
0
    }
176
}
177
178
/// The key used to sort the awaited actions.
179
///
180
/// The rules for sorting are as follows:
181
/// 1. priority of the action
182
/// 2. insert order of the action (lower = higher priority)
183
/// 3. (mostly random hash based on the action info)
184
2
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
185
#[repr(transparent)]
186
pub struct AwaitedActionSortKey(u64);
187
188
impl MetricsComponent for AwaitedActionSortKey {
189
0
    fn publish(
190
0
        &self,
191
0
        _kind: MetricKind,
192
0
        _field_metadata: MetricFieldData,
193
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
194
0
        Ok(MetricPublishKnownKindData::Counter(self.0))
195
0
    }
196
}
197
198
impl AwaitedActionSortKey {
199
    #[rustfmt::skip]
200
33
    const fn new(priority: i32, insert_timestamp: u32) -> Self {
201
        // Shift `new_priority` so [`i32::MIN`] is represented by zero.
202
        // This makes it so any nagative values are positive, but
203
        // maintains ordering.
204
        const MIN_I32: i64 = (i32::MIN as i64).abs();
205
33
        let priority = ((priority as i64 + MIN_I32) as u32).to_be_bytes();
206
33
207
33
        // Invert our timestamp so the larger the timestamp the lower the number.
208
33
        // This makes timestamp descending order instead of ascending.
209
33
        let timestamp = (insert_timestamp ^ u32::MAX).to_be_bytes();
210
33
211
33
        AwaitedActionSortKey(u64::from_be_bytes([
212
33
            priority[0], priority[1], priority[2], priority[3],
213
33
            timestamp[0], timestamp[1], timestamp[2], timestamp[3],
214
33
        ]))
215
33
    }
216
217
33
    fn new_with_unique_key(priority: i32, insert_timestamp: &SystemTime) -> Self {
218
33
        let timestamp = insert_timestamp
219
33
            .duration_since(UNIX_EPOCH)
220
33
            .unwrap()
221
33
            .as_secs() as u32;
222
33
        Self::new(priority, timestamp)
223
33
    }
224
225
2
    pub(crate) fn as_u64(self) -> u64 {
226
2
        self.0
227
2
    }
228
}
229
230
// Ensure the size of the sort key is the same as a `u64`.
231
assert_eq_size!(AwaitedActionSortKey, u64);
232
233
const_assert_eq!(
234
    AwaitedActionSortKey::new(0x1234_5678, 0x9abc_def0).0,
235
    // Note: Result has 0x12345678 + 0x80000000 = 0x92345678 because we need
236
    // to shift the `i32::MIN` value to be represented by zero.
237
    // Note: `6543210f` are the inverted bits of `9abcdef0`.
238
    // This effectively inverts the priority to now have the highest priority
239
    // be the lowest timestamps.
240
    AwaitedActionSortKey(0x9234_5678_6543_210f).0
241
);
242
// Ensure the priority is used as the sort key first.
243
const_assert!(
244
    AwaitedActionSortKey::new(i32::MAX, 0).0 > AwaitedActionSortKey::new(i32::MAX - 1, 0).0
245
);
246
const_assert!(AwaitedActionSortKey::new(i32::MAX - 1, 0).0 > AwaitedActionSortKey::new(1, 0).0);
247
const_assert!(AwaitedActionSortKey::new(1, 0).0 > AwaitedActionSortKey::new(0, 0).0);
248
const_assert!(AwaitedActionSortKey::new(0, 0).0 > AwaitedActionSortKey::new(-1, 0).0);
249
const_assert!(AwaitedActionSortKey::new(-1, 0).0 > AwaitedActionSortKey::new(i32::MIN + 1, 0).0);
250
const_assert!(
251
    AwaitedActionSortKey::new(i32::MIN + 1, 0).0 > AwaitedActionSortKey::new(i32::MIN, 0).0
252
);
253
254
// Ensure the insert timestamp is used as the sort key second.
255
const_assert!(AwaitedActionSortKey::new(0, u32::MIN).0 > AwaitedActionSortKey::new(0, u32::MAX).0);