Coverage Report

Created: 2026-06-04 10:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
use std::time::{SystemTime, UNIX_EPOCH};
17
18
use nativelink_error::{Error, ResultExt, make_input_err};
19
use nativelink_metric::{
20
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
21
};
22
use nativelink_util::action_messages::{
23
    ActionInfo, ActionStage, ActionState, OperationId, WorkerId,
24
};
25
use nativelink_util::origin_event::OriginMetadata;
26
use opentelemetry::baggage::BaggageExt;
27
use opentelemetry::context::Context;
28
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
29
use serde::{Deserialize, Serialize};
30
use static_assertions::{assert_eq_size, const_assert, const_assert_eq};
31
32
/// The version of the awaited action.
33
/// This number will always increment by one each time
34
/// the action is updated.
35
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
36
struct AwaitedActionVersion(i64);
37
38
impl MetricsComponent for AwaitedActionVersion {
39
0
    fn publish(
40
0
        &self,
41
0
        _kind: MetricKind,
42
0
        _field_metadata: MetricFieldData,
43
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
44
0
        Ok(MetricPublishKnownKindData::Counter(u64::from_ne_bytes(
45
0
            self.0.to_ne_bytes(),
46
0
        )))
47
0
    }
48
}
49
50
/// An action that is being awaited on and last known state.
51
#[derive(Debug, Clone, MetricsComponent, Serialize, Deserialize)]
52
pub struct AwaitedAction {
53
    /// The current version of the action.
54
    #[metric(help = "The version of the AwaitedAction")]
55
    version: AwaitedActionVersion,
56
57
    /// The action that is being awaited on.
58
    #[metric(help = "The action info of the AwaitedAction")]
59
    action_info: Arc<ActionInfo>,
60
61
    /// The operation id of the action.
62
    // If you need the client operation id, it may be set in
63
    // ActionState::operation_id.
64
    #[metric(help = "The operation id of the AwaitedAction")]
65
    operation_id: OperationId,
66
67
    /// The currentsort key used to order the actions.
68
    #[metric(help = "The sort key of the AwaitedAction")]
69
    sort_key: AwaitedActionSortKey,
70
71
    /// The time the action was last updated.
72
    #[metric(help = "The last time the worker updated the AwaitedAction")]
73
    last_worker_updated_timestamp: SystemTime,
74
75
    /// The last time the client sent a keepalive message.
76
    #[metric(help = "The last time the client sent a keepalive message")]
77
    last_client_keepalive_timestamp: SystemTime,
78
79
    /// Worker that is currently running this action, None if unassigned.
80
    #[metric(help = "The worker id of the AwaitedAction")]
81
    worker_id: Option<WorkerId>,
82
83
    /// The current state of the action.
84
    #[metric(help = "The state of the AwaitedAction")]
85
    state: Arc<ActionState>,
86
87
    /// The origin metadata of the action.
88
    maybe_origin_metadata: Option<OriginMetadata>,
89
90
    /// Number of attempts the job has been tried.
91
    #[metric(help = "The number of attempts the AwaitedAction has been tried")]
92
    pub attempts: usize,
93
}
94
95
impl AwaitedAction {
96
42
    pub fn new(operation_id: OperationId, action_info: Arc<ActionInfo>, now: SystemTime) -> Self {
97
42
        let sort_key = AwaitedActionSortKey::new_with_unique_key(
98
42
            action_info.priority,
99
42
            &action_info.insert_timestamp,
100
        );
101
42
        let action_state = Arc::new(ActionState {
102
42
            stage: ActionStage::Queued,
103
42
            // Note: We don't use the real client_operation_id here because
104
42
            // the only place AwaitedAction::new should ever be called is
105
42
            // when the action is first created and this struct will be stored
106
42
            // in the database, so we don't want to accidentally leak the
107
42
            // client_operation_id to all clients.
108
42
            client_operation_id: operation_id.clone(),
109
42
            action_digest: action_info.unique_qualifier.digest(),
110
42
            last_transition_timestamp: now,
111
42
        });
112
113
42
        let ctx = Context::current();
114
42
        let baggage = ctx.baggage();
115
116
42
        let maybe_origin_metadata = if baggage.is_empty() {
117
38
            None
118
        } else {
119
            Some(OriginMetadata {
120
4
                identity: baggage
121
4
                    .get(ENDUSER_ID)
122
4
                    .map(|v| v.as_str().to_string())
123
4
                    .unwrap_or_default(),
124
4
                bazel_metadata: None, // TODO(palfrey): Implement conversion.
125
            })
126
        };
127
128
42
        Self {
129
42
            version: AwaitedActionVersion(0),
130
42
            action_info,
131
42
            operation_id,
132
42
            sort_key,
133
42
            attempts: 0,
134
42
            last_worker_updated_timestamp: now,
135
42
            last_client_keepalive_timestamp: now,
136
42
            maybe_origin_metadata,
137
42
            worker_id: None,
138
42
            state: action_state,
139
42
        }
140
42
    }
141
142
105
    pub(crate) const fn version(&self) -> i64 {
143
105
        self.version.0
144
105
    }
145
146
57
    pub(crate) const fn set_version(&mut self, version: i64) {
147
57
        self.version = AwaitedActionVersion(version);
148
57
    }
149
150
43
    pub(crate) const fn increment_version(&mut self) {
151
43
        self.version = AwaitedActionVersion(self.version.0 + 1);
152
43
    }
153
154
419
    pub const fn action_info(&self) -> &Arc<ActionInfo> {
155
419
        &self.action_info
156
419
    }
157
158
152
    pub const fn operation_id(&self) -> &OperationId {
159
152
        &self.operation_id
160
152
    }
161
162
85
    pub(crate) const fn sort_key(&self) -> AwaitedActionSortKey {
163
85
        self.sort_key
164
85
    }
165
166
1.18k
    pub const fn state(&self) -> &Arc<ActionState> {
167
1.18k
        &self.state
168
1.18k
    }
169
170
16
    pub fn is_complete(&self) -> bool {
171
16
        match &self.state.stage {
172
            ActionStage::Unknown
173
            | ActionStage::CacheCheck
174
            | ActionStage::Queued
175
15
            | ActionStage::Executing => false,
176
1
            ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => true,
177
        }
178
16
    }
179
180
151
    pub(crate) const fn maybe_origin_metadata(&self) -> Option<&OriginMetadata> {
181
151
        self.maybe_origin_metadata.as_ref()
182
151
    }
183
184
136
    pub(crate) const fn worker_id(&self) -> Option<&WorkerId> {
185
136
        self.worker_id.as_ref()
186
136
    }
187
188
564
    pub(crate) const fn last_worker_updated_timestamp(&self) -> SystemTime {
189
564
        self.last_worker_updated_timestamp
190
564
    }
191
192
105
    pub(crate) const fn worker_keep_alive(&mut self, now: SystemTime) {
193
105
        self.last_worker_updated_timestamp = now;
194
105
    }
195
196
606
    pub(crate) const fn last_client_keepalive_timestamp(&self) -> SystemTime {
197
606
        self.last_client_keepalive_timestamp
198
606
    }
199
200
60
    pub(crate) const fn update_client_keep_alive(&mut self, now: SystemTime) {
201
60
        self.last_client_keepalive_timestamp = now;
202
60
    }
203
204
70
    pub(crate) fn set_client_operation_id(&mut self, client_operation_id: OperationId) {
205
70
        Arc::make_mut(&mut self.state).client_operation_id = client_operation_id;
206
70
    }
207
208
    /// Sets the worker id that is currently processing this action.
209
56
    pub(crate) fn set_worker_id(&mut self, new_maybe_worker_id: Option<WorkerId>, now: SystemTime) {
210
56
        if self.worker_id != new_maybe_worker_id {
211
48
            self.worker_id = new_maybe_worker_id;
212
48
            self.worker_keep_alive(now);
213
48
        
}8
214
56
    }
215
216
    /// Sets the current state of the action and updates the last worker updated timestamp.
217
57
    pub fn worker_set_state(&mut self, mut state: Arc<ActionState>, now: SystemTime) {
218
57
        core::mem::swap(&mut self.state, &mut state);
219
57
        self.worker_keep_alive(now);
220
57
    }
221
}
222
223
impl TryFrom<&[u8]> for AwaitedAction {
224
    type Error = Error;
225
0
    fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
226
0
        serde_json::from_slice(value)
227
0
            .map_err(|e| make_input_err!("{}", e.to_string()))
228
0
            .err_tip(|| "In AwaitedAction::TryFrom::&[u8]")
229
0
    }
230
}
231
232
/// The key used to sort the awaited actions.
233
///
234
/// The rules for sorting are as follows:
235
/// 1. priority of the action
236
/// 2. insert order of the action (lower = higher priority)
237
/// 3. (mostly random hash based on the action info)
238
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
239
#[repr(transparent)]
240
pub struct AwaitedActionSortKey(u64);
241
242
impl MetricsComponent for AwaitedActionSortKey {
243
0
    fn publish(
244
0
        &self,
245
0
        _kind: MetricKind,
246
0
        _field_metadata: MetricFieldData,
247
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
248
0
        Ok(MetricPublishKnownKindData::Counter(self.0))
249
0
    }
250
}
251
252
impl AwaitedActionSortKey {
253
42
    const fn new(priority: i32, insert_timestamp: u32) -> Self {
254
        // Shift the signed i32 range [i32::MIN, i32::MAX] to the unsigned u32 range
255
        // [0, u32::MAX] to preserve ordering when we convert to bytes for sorting.
256
42
        let priority_u32 = i32::MIN.unsigned_abs().wrapping_add_signed(priority);
257
42
        let priority = priority_u32.to_be_bytes();
258
259
        // Invert our timestamp so the larger the timestamp the lower the number.
260
        // This makes timestamp descending order instead of ascending.
261
42
        let timestamp = (insert_timestamp ^ u32::MAX).to_be_bytes();
262
263
42
        Self(u64::from_be_bytes([
264
42
            priority[0],
265
42
            priority[1],
266
42
            priority[2],
267
42
            priority[3],
268
42
            timestamp[0],
269
42
            timestamp[1],
270
42
            timestamp[2],
271
42
            timestamp[3],
272
42
        ]))
273
42
    }
274
275
42
    fn new_with_unique_key(priority: i32, insert_timestamp: &SystemTime) -> Self {
276
42
        let timestamp = u32::try_from(
277
42
            insert_timestamp
278
42
                .duration_since(UNIX_EPOCH)
279
42
                .unwrap()
280
42
                .as_secs(),
281
        )
282
42
        .unwrap_or(u32::MAX);
283
42
        Self::new(priority, timestamp)
284
42
    }
285
286
16
    pub(crate) const fn as_u64(self) -> u64 {
287
16
        self.0
288
16
    }
289
}
290
291
// Ensure the size of the sort key is the same as a `u64`.
292
assert_eq_size!(AwaitedActionSortKey, u64);
293
294
const_assert_eq!(
295
    AwaitedActionSortKey::new(0x1234_5678, 0x9abc_def0).0,
296
    // Note: Result has 0x12345678 + 0x80000000 = 0x92345678 because we need
297
    // to shift the `i32::MIN` value to be represented by zero.
298
    // Note: `6543210f` are the inverted bits of `9abcdef0`.
299
    // This effectively inverts the priority to now have the highest priority
300
    // be the lowest timestamps.
301
    AwaitedActionSortKey(0x9234_5678_6543_210f).0
302
);
303
// Ensure the priority is used as the sort key first.
304
const_assert!(
305
    AwaitedActionSortKey::new(i32::MAX, 0).0 > AwaitedActionSortKey::new(i32::MAX - 1, 0).0
306
);
307
const_assert!(AwaitedActionSortKey::new(i32::MAX - 1, 0).0 > AwaitedActionSortKey::new(1, 0).0);
308
const_assert!(AwaitedActionSortKey::new(1, 0).0 > AwaitedActionSortKey::new(0, 0).0);
309
const_assert!(AwaitedActionSortKey::new(0, 0).0 > AwaitedActionSortKey::new(-1, 0).0);
310
const_assert!(AwaitedActionSortKey::new(-1, 0).0 > AwaitedActionSortKey::new(i32::MIN + 1, 0).0);
311
const_assert!(
312
    AwaitedActionSortKey::new(i32::MIN + 1, 0).0 > AwaitedActionSortKey::new(i32::MIN, 0).0
313
);
314
315
// Ensure the insert timestamp is used as the sort key second.
316
const_assert!(AwaitedActionSortKey::new(0, u32::MIN).0 > AwaitedActionSortKey::new(0, u32::MAX).0);