Coverage Report

Created: 2025-05-30 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
use std::time::{SystemTime, UNIX_EPOCH};
17
18
use nativelink_error::{Error, ResultExt, make_input_err};
19
use nativelink_metric::{
20
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
21
};
22
use nativelink_util::action_messages::{
23
    ActionInfo, ActionStage, ActionState, OperationId, WorkerId,
24
};
25
use nativelink_util::origin_event::OriginMetadata;
26
use opentelemetry::baggage::BaggageExt;
27
use opentelemetry::context::Context;
28
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
29
use serde::{Deserialize, Serialize};
30
use static_assertions::{assert_eq_size, const_assert, const_assert_eq};
31
32
/// The version of the awaited action.
33
/// This number will always increment by one each time
34
/// the action is updated.
35
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
36
struct AwaitedActionVersion(u64);
37
38
impl MetricsComponent for AwaitedActionVersion {
39
0
    fn publish(
40
0
        &self,
41
0
        _kind: MetricKind,
42
0
        _field_metadata: MetricFieldData,
43
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
44
0
        Ok(MetricPublishKnownKindData::Counter(self.0))
45
0
    }
46
}
47
48
/// An action that is being awaited on and last known state.
49
#[derive(Debug, Clone, MetricsComponent, Serialize, Deserialize)]
50
pub struct AwaitedAction {
51
    /// The current version of the action.
52
    #[metric(help = "The version of the AwaitedAction")]
53
    version: AwaitedActionVersion,
54
55
    /// The action that is being awaited on.
56
    #[metric(help = "The action info of the AwaitedAction")]
57
    action_info: Arc<ActionInfo>,
58
59
    /// The operation id of the action.
60
    // If you need the client operation id, it may be set in
61
    // ActionState::operation_id.
62
    #[metric(help = "The operation id of the AwaitedAction")]
63
    operation_id: OperationId,
64
65
    /// The currentsort key used to order the actions.
66
    #[metric(help = "The sort key of the AwaitedAction")]
67
    sort_key: AwaitedActionSortKey,
68
69
    /// The time the action was last updated.
70
    #[metric(help = "The last time the worker updated the AwaitedAction")]
71
    last_worker_updated_timestamp: SystemTime,
72
73
    /// The last time the client sent a keepalive message.
74
    #[metric(help = "The last time the client sent a keepalive message")]
75
    last_client_keepalive_timestamp: SystemTime,
76
77
    /// Worker that is currently running this action, None if unassigned.
78
    #[metric(help = "The worker id of the AwaitedAction")]
79
    worker_id: Option<WorkerId>,
80
81
    /// The current state of the action.
82
    #[metric(help = "The state of the AwaitedAction")]
83
    state: Arc<ActionState>,
84
85
    /// The origin metadata of the action.
86
    maybe_origin_metadata: Option<OriginMetadata>,
87
88
    /// Number of attempts the job has been tried.
89
    #[metric(help = "The number of attempts the AwaitedAction has been tried")]
90
    pub attempts: usize,
91
}
92
93
impl AwaitedAction {
94
34
    pub fn new(operation_id: OperationId, action_info: Arc<ActionInfo>, now: SystemTime) -> Self {
95
34
        let sort_key = AwaitedActionSortKey::new_with_unique_key(
96
34
            action_info.priority,
97
34
            &action_info.insert_timestamp,
98
        );
99
34
        let action_state = Arc::new(ActionState {
100
34
            stage: ActionStage::Queued,
101
34
            // Note: We don't use the real client_operation_id here because
102
34
            // the only place AwaitedAction::new should ever be called is
103
34
            // when the action is first created and this struct will be stored
104
34
            // in the database, so we don't want to accidentally leak the
105
34
            // client_operation_id to all clients.
106
34
            client_operation_id: operation_id.clone(),
107
34
            action_digest: action_info.unique_qualifier.digest(),
108
34
        });
109
110
34
        let ctx = Context::current();
111
34
        let baggage = ctx.baggage();
112
113
34
        let maybe_origin_metadata = if baggage.is_empty() {
  Branch (113:40): [True: 30, False: 4]
  Branch (113:40): [Folded - Ignored]
114
30
            None
115
        } else {
116
            Some(OriginMetadata {
117
4
                identity: baggage
118
4
                    .get(ENDUSER_ID)
119
4
                    .map(|v| v.as_str().to_string())
120
4
                    .unwrap_or_default(),
121
4
                bazel_metadata: None, // TODO(aaronmondal): Implement conversion.
122
            })
123
        };
124
125
34
        Self {
126
34
            version: AwaitedActionVersion(0),
127
34
            action_info,
128
34
            operation_id,
129
34
            sort_key,
130
34
            attempts: 0,
131
34
            last_worker_updated_timestamp: now,
132
34
            last_client_keepalive_timestamp: now,
133
34
            maybe_origin_metadata,
134
34
            worker_id: None,
135
34
            state: action_state,
136
34
        }
137
34
    }
138
139
80
    pub(crate) const fn version(&self) -> u64 {
140
80
        self.version.0
141
80
    }
142
143
2
    pub(crate) const fn set_version(&mut self, version: u64) {
144
2
        self.version = AwaitedActionVersion(version);
145
2
    }
146
147
39
    pub(crate) const fn increment_version(&mut self) {
148
39
        self.version = AwaitedActionVersion(self.version.0 + 1);
149
39
    }
150
151
173
    pub const fn action_info(&self) -> &Arc<ActionInfo> {
152
173
        &self.action_info
153
173
    }
154
155
91
    pub const fn operation_id(&self) -> &OperationId {
156
91
        &self.operation_id
157
91
    }
158
159
65
    pub(crate) const fn sort_key(&self) -> AwaitedActionSortKey {
160
65
        self.sort_key
161
65
    }
162
163
877
    pub const fn state(&self) -> &Arc<ActionState> {
164
877
        &self.state
165
877
    }
166
167
120
    pub(crate) const fn maybe_origin_metadata(&self) -> Option<&OriginMetadata> {
168
120
        self.maybe_origin_metadata.as_ref()
169
120
    }
170
171
53
    pub(crate) const fn worker_id(&self) -> Option<&WorkerId> {
172
53
        self.worker_id.as_ref()
173
53
    }
174
175
549
    pub(crate) const fn last_worker_updated_timestamp(&self) -> SystemTime {
176
549
        self.last_worker_updated_timestamp
177
549
    }
178
179
77
    pub(crate) const fn worker_keep_alive(&mut self, now: SystemTime) {
180
77
        self.last_worker_updated_timestamp = now;
181
77
    }
182
183
550
    pub(crate) const fn last_client_keepalive_timestamp(&self) -> SystemTime {
184
550
        self.last_client_keepalive_timestamp
185
550
    }
186
187
57
    pub(crate) const fn update_client_keep_alive(&mut self, now: SystemTime) {
188
57
        self.last_client_keepalive_timestamp = now;
189
57
    }
190
191
62
    pub(crate) fn set_client_operation_id(&mut self, client_operation_id: OperationId) {
192
62
        Arc::make_mut(&mut self.state).client_operation_id = client_operation_id;
193
62
    }
194
195
    /// Sets the worker id that is currently processing this action.
196
41
    pub(crate) fn set_worker_id(&mut self, new_maybe_worker_id: Option<WorkerId>, now: SystemTime) {
197
41
        if self.worker_id != new_maybe_worker_id {
  Branch (197:12): [True: 35, False: 6]
  Branch (197:12): [Folded - Ignored]
198
35
            self.worker_id = new_maybe_worker_id;
199
35
            self.worker_keep_alive(now);
200
35
        
}6
201
41
    }
202
203
    /// Sets the current state of the action and updates the last worker updated timestamp.
204
42
    pub fn worker_set_state(&mut self, mut state: Arc<ActionState>, now: SystemTime) {
205
42
        core::mem::swap(&mut self.state, &mut state);
206
42
        self.worker_keep_alive(now);
207
42
    }
208
}
209
210
impl TryFrom<&[u8]> for AwaitedAction {
211
    type Error = Error;
212
0
    fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
213
0
        serde_json::from_slice(value)
214
0
            .map_err(|e| make_input_err!("{}", e.to_string()))
215
0
            .err_tip(|| "In AwaitedAction::TryFrom::&[u8]")
216
0
    }
217
}
218
219
/// The key used to sort the awaited actions.
220
///
221
/// The rules for sorting are as follows:
222
/// 1. priority of the action
223
/// 2. insert order of the action (lower = higher priority)
224
/// 3. (mostly random hash based on the action info)
225
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
226
#[repr(transparent)]
227
pub struct AwaitedActionSortKey(u64);
228
229
impl MetricsComponent for AwaitedActionSortKey {
230
0
    fn publish(
231
0
        &self,
232
0
        _kind: MetricKind,
233
0
        _field_metadata: MetricFieldData,
234
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
235
0
        Ok(MetricPublishKnownKindData::Counter(self.0))
236
0
    }
237
}
238
239
impl AwaitedActionSortKey {
240
    #[rustfmt::skip]
241
34
    const fn new(priority: i32, insert_timestamp: u32) -> Self {
242
        // Shift `new_priority` so [`i32::MIN`] is represented by zero.
243
        // This makes it so any negative values are positive, but
244
        // maintains ordering.
245
        const MIN_I32: i64 = (i32::MIN as i64).abs();
246
34
        let priority = ((priority as i64 + MIN_I32) as u32).to_be_bytes();
247
248
        // Invert our timestamp so the larger the timestamp the lower the number.
249
        // This makes timestamp descending order instead of ascending.
250
34
        let timestamp = (insert_timestamp ^ u32::MAX).to_be_bytes();
251
252
34
        Self(u64::from_be_bytes([
253
34
            priority[0], priority[1], priority[2], priority[3],
254
34
            timestamp[0], timestamp[1], timestamp[2], timestamp[3],
255
34
        ]))
256
34
    }
257
258
34
    fn new_with_unique_key(priority: i32, insert_timestamp: &SystemTime) -> Self {
259
34
        let timestamp = insert_timestamp
260
34
            .duration_since(UNIX_EPOCH)
261
34
            .unwrap()
262
34
            .as_secs() as u32;
263
34
        Self::new(priority, timestamp)
264
34
    }
265
266
2
    pub(crate) const fn as_u64(self) -> u64 {
267
2
        self.0
268
2
    }
269
}
270
271
// Ensure the size of the sort key is the same as a `u64`.
272
assert_eq_size!(AwaitedActionSortKey, u64);
273
274
const_assert_eq!(
275
    AwaitedActionSortKey::new(0x1234_5678, 0x9abc_def0).0,
276
    // Note: Result has 0x12345678 + 0x80000000 = 0x92345678 because we need
277
    // to shift the `i32::MIN` value to be represented by zero.
278
    // Note: `6543210f` are the inverted bits of `9abcdef0`.
279
    // This effectively inverts the priority to now have the highest priority
280
    // be the lowest timestamps.
281
    AwaitedActionSortKey(0x9234_5678_6543_210f).0
282
);
283
// Ensure the priority is used as the sort key first.
284
const_assert!(
285
    AwaitedActionSortKey::new(i32::MAX, 0).0 > AwaitedActionSortKey::new(i32::MAX - 1, 0).0
286
);
287
const_assert!(AwaitedActionSortKey::new(i32::MAX - 1, 0).0 > AwaitedActionSortKey::new(1, 0).0);
288
const_assert!(AwaitedActionSortKey::new(1, 0).0 > AwaitedActionSortKey::new(0, 0).0);
289
const_assert!(AwaitedActionSortKey::new(0, 0).0 > AwaitedActionSortKey::new(-1, 0).0);
290
const_assert!(AwaitedActionSortKey::new(-1, 0).0 > AwaitedActionSortKey::new(i32::MIN + 1, 0).0);
291
const_assert!(
292
    AwaitedActionSortKey::new(i32::MIN + 1, 0).0 > AwaitedActionSortKey::new(i32::MIN, 0).0
293
);
294
295
// Ensure the insert timestamp is used as the sort key second.
296
const_assert!(AwaitedActionSortKey::new(0, u32::MIN).0 > AwaitedActionSortKey::new(0, u32::MAX).0);