Coverage Report

Created: 2025-12-16 15:31

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
use std::time::{SystemTime, UNIX_EPOCH};
17
18
use nativelink_error::{Error, ResultExt, make_input_err};
19
use nativelink_metric::{
20
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
21
};
22
use nativelink_util::action_messages::{
23
    ActionInfo, ActionStage, ActionState, OperationId, WorkerId,
24
};
25
use nativelink_util::origin_event::OriginMetadata;
26
use opentelemetry::baggage::BaggageExt;
27
use opentelemetry::context::Context;
28
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
29
use serde::{Deserialize, Serialize};
30
use static_assertions::{assert_eq_size, const_assert, const_assert_eq};
31
32
/// The version of the awaited action.
33
/// This number will always increment by one each time
34
/// the action is updated.
35
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
36
struct AwaitedActionVersion(i64);
37
38
impl MetricsComponent for AwaitedActionVersion {
39
0
    fn publish(
40
0
        &self,
41
0
        _kind: MetricKind,
42
0
        _field_metadata: MetricFieldData,
43
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
44
0
        Ok(MetricPublishKnownKindData::Counter(u64::from_ne_bytes(
45
0
            self.0.to_ne_bytes(),
46
0
        )))
47
0
    }
48
}
49
50
/// An action that is being awaited on and last known state.
51
#[derive(Debug, Clone, MetricsComponent, Serialize, Deserialize)]
52
pub struct AwaitedAction {
53
    /// The current version of the action.
54
    #[metric(help = "The version of the AwaitedAction")]
55
    version: AwaitedActionVersion,
56
57
    /// The action that is being awaited on.
58
    #[metric(help = "The action info of the AwaitedAction")]
59
    action_info: Arc<ActionInfo>,
60
61
    /// The operation id of the action.
62
    // If you need the client operation id, it may be set in
63
    // ActionState::operation_id.
64
    #[metric(help = "The operation id of the AwaitedAction")]
65
    operation_id: OperationId,
66
67
    /// The currentsort key used to order the actions.
68
    #[metric(help = "The sort key of the AwaitedAction")]
69
    sort_key: AwaitedActionSortKey,
70
71
    /// The time the action was last updated.
72
    #[metric(help = "The last time the worker updated the AwaitedAction")]
73
    last_worker_updated_timestamp: SystemTime,
74
75
    /// The last time the client sent a keepalive message.
76
    #[metric(help = "The last time the client sent a keepalive message")]
77
    last_client_keepalive_timestamp: SystemTime,
78
79
    /// Worker that is currently running this action, None if unassigned.
80
    #[metric(help = "The worker id of the AwaitedAction")]
81
    worker_id: Option<WorkerId>,
82
83
    /// The current state of the action.
84
    #[metric(help = "The state of the AwaitedAction")]
85
    state: Arc<ActionState>,
86
87
    /// The origin metadata of the action.
88
    maybe_origin_metadata: Option<OriginMetadata>,
89
90
    /// Number of attempts the job has been tried.
91
    #[metric(help = "The number of attempts the AwaitedAction has been tried")]
92
    pub attempts: usize,
93
}
94
95
impl AwaitedAction {
96
35
    pub fn new(operation_id: OperationId, action_info: Arc<ActionInfo>, now: SystemTime) -> Self {
97
35
        let sort_key = AwaitedActionSortKey::new_with_unique_key(
98
35
            action_info.priority,
99
35
            &action_info.insert_timestamp,
100
        );
101
35
        let action_state = Arc::new(ActionState {
102
35
            stage: ActionStage::Queued,
103
35
            // Note: We don't use the real client_operation_id here because
104
35
            // the only place AwaitedAction::new should ever be called is
105
35
            // when the action is first created and this struct will be stored
106
35
            // in the database, so we don't want to accidentally leak the
107
35
            // client_operation_id to all clients.
108
35
            client_operation_id: operation_id.clone(),
109
35
            action_digest: action_info.unique_qualifier.digest(),
110
35
            last_transition_timestamp: now,
111
35
        });
112
113
35
        let ctx = Context::current();
114
35
        let baggage = ctx.baggage();
115
116
35
        let maybe_origin_metadata = if baggage.is_empty() {
  Branch (116:40): [True: 31, False: 4]
  Branch (116:40): [Folded - Ignored]
117
31
            None
118
        } else {
119
            Some(OriginMetadata {
120
4
                identity: baggage
121
4
                    .get(ENDUSER_ID)
122
4
                    .map(|v| v.as_str().to_string())
123
4
                    .unwrap_or_default(),
124
4
                bazel_metadata: None, // TODO(palfrey): Implement conversion.
125
            })
126
        };
127
128
35
        Self {
129
35
            version: AwaitedActionVersion(0),
130
35
            action_info,
131
35
            operation_id,
132
35
            sort_key,
133
35
            attempts: 0,
134
35
            last_worker_updated_timestamp: now,
135
35
            last_client_keepalive_timestamp: now,
136
35
            maybe_origin_metadata,
137
35
            worker_id: None,
138
35
            state: action_state,
139
35
        }
140
35
    }
141
142
94
    pub(crate) const fn version(&self) -> i64 {
143
94
        self.version.0
144
94
    }
145
146
36
    pub(crate) const fn set_version(&mut self, version: i64) {
147
36
        self.version = AwaitedActionVersion(version);
148
36
    }
149
150
39
    pub(crate) const fn increment_version(&mut self) {
151
39
        self.version = AwaitedActionVersion(self.version.0 + 1);
152
39
    }
153
154
371
    pub const fn action_info(&self) -> &Arc<ActionInfo> {
155
371
        &self.action_info
156
371
    }
157
158
130
    pub const fn operation_id(&self) -> &OperationId {
159
130
        &self.operation_id
160
130
    }
161
162
76
    pub(crate) const fn sort_key(&self) -> AwaitedActionSortKey {
163
76
        self.sort_key
164
76
    }
165
166
1.06k
    pub const fn state(&self) -> &Arc<ActionState> {
167
1.06k
        &self.state
168
1.06k
    }
169
170
134
    pub(crate) const fn maybe_origin_metadata(&self) -> Option<&OriginMetadata> {
171
134
        self.maybe_origin_metadata.as_ref()
172
134
    }
173
174
115
    pub(crate) const fn worker_id(&self) -> Option<&WorkerId> {
175
115
        self.worker_id.as_ref()
176
115
    }
177
178
556
    pub(crate) const fn last_worker_updated_timestamp(&self) -> SystemTime {
179
556
        self.last_worker_updated_timestamp
180
556
    }
181
182
91
    pub(crate) const fn worker_keep_alive(&mut self, now: SystemTime) {
183
91
        self.last_worker_updated_timestamp = now;
184
91
    }
185
186
582
    pub(crate) const fn last_client_keepalive_timestamp(&self) -> SystemTime {
187
582
        self.last_client_keepalive_timestamp
188
582
    }
189
190
60
    pub(crate) const fn update_client_keep_alive(&mut self, now: SystemTime) {
191
60
        self.last_client_keepalive_timestamp = now;
192
60
    }
193
194
66
    pub(crate) fn set_client_operation_id(&mut self, client_operation_id: OperationId) {
195
66
        Arc::make_mut(&mut self.state).client_operation_id = client_operation_id;
196
66
    }
197
198
    /// Sets the worker id that is currently processing this action.
199
49
    pub(crate) fn set_worker_id(&mut self, new_maybe_worker_id: Option<WorkerId>, now: SystemTime) {
200
49
        if self.worker_id != new_maybe_worker_id {
  Branch (200:12): [True: 42, False: 7]
  Branch (200:12): [Folded - Ignored]
201
42
            self.worker_id = new_maybe_worker_id;
202
42
            self.worker_keep_alive(now);
203
42
        
}7
204
49
    }
205
206
    /// Sets the current state of the action and updates the last worker updated timestamp.
207
49
    pub fn worker_set_state(&mut self, mut state: Arc<ActionState>, now: SystemTime) {
208
49
        core::mem::swap(&mut self.state, &mut state);
209
49
        self.worker_keep_alive(now);
210
49
    }
211
}
212
213
impl TryFrom<&[u8]> for AwaitedAction {
214
    type Error = Error;
215
0
    fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
216
0
        serde_json::from_slice(value)
217
0
            .map_err(|e| make_input_err!("{}", e.to_string()))
218
0
            .err_tip(|| "In AwaitedAction::TryFrom::&[u8]")
219
0
    }
220
}
221
222
/// The key used to sort the awaited actions.
223
///
224
/// The rules for sorting are as follows:
225
/// 1. priority of the action
226
/// 2. insert order of the action (lower = higher priority)
227
/// 3. (mostly random hash based on the action info)
228
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
229
#[repr(transparent)]
230
pub struct AwaitedActionSortKey(u64);
231
232
impl MetricsComponent for AwaitedActionSortKey {
233
0
    fn publish(
234
0
        &self,
235
0
        _kind: MetricKind,
236
0
        _field_metadata: MetricFieldData,
237
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
238
0
        Ok(MetricPublishKnownKindData::Counter(self.0))
239
0
    }
240
}
241
242
impl AwaitedActionSortKey {
243
35
    const fn new(priority: i32, insert_timestamp: u32) -> Self {
244
        // Shift the signed i32 range [i32::MIN, i32::MAX] to the unsigned u32 range
245
        // [0, u32::MAX] to preserve ordering when we convert to bytes for sorting.
246
35
        let priority_u32 = i32::MIN.unsigned_abs().wrapping_add_signed(priority);
247
35
        let priority = priority_u32.to_be_bytes();
248
249
        // Invert our timestamp so the larger the timestamp the lower the number.
250
        // This makes timestamp descending order instead of ascending.
251
35
        let timestamp = (insert_timestamp ^ u32::MAX).to_be_bytes();
252
253
35
        Self(u64::from_be_bytes([
254
35
            priority[0],
255
35
            priority[1],
256
35
            priority[2],
257
35
            priority[3],
258
35
            timestamp[0],
259
35
            timestamp[1],
260
35
            timestamp[2],
261
35
            timestamp[3],
262
35
        ]))
263
35
    }
264
265
35
    fn new_with_unique_key(priority: i32, insert_timestamp: &SystemTime) -> Self {
266
35
        let timestamp = u32::try_from(
267
35
            insert_timestamp
268
35
                .duration_since(UNIX_EPOCH)
269
35
                .unwrap()
270
35
                .as_secs(),
271
        )
272
35
        .unwrap_or(u32::MAX);
273
35
        Self::new(priority, timestamp)
274
35
    }
275
276
13
    pub(crate) const fn as_u64(self) -> u64 {
277
13
        self.0
278
13
    }
279
}
280
281
// Ensure the size of the sort key is the same as a `u64`.
282
assert_eq_size!(AwaitedActionSortKey, u64);
283
284
const_assert_eq!(
285
    AwaitedActionSortKey::new(0x1234_5678, 0x9abc_def0).0,
286
    // Note: Result has 0x12345678 + 0x80000000 = 0x92345678 because we need
287
    // to shift the `i32::MIN` value to be represented by zero.
288
    // Note: `6543210f` are the inverted bits of `9abcdef0`.
289
    // This effectively inverts the priority to now have the highest priority
290
    // be the lowest timestamps.
291
    AwaitedActionSortKey(0x9234_5678_6543_210f).0
292
);
293
// Ensure the priority is used as the sort key first.
294
const_assert!(
295
    AwaitedActionSortKey::new(i32::MAX, 0).0 > AwaitedActionSortKey::new(i32::MAX - 1, 0).0
296
);
297
const_assert!(AwaitedActionSortKey::new(i32::MAX - 1, 0).0 > AwaitedActionSortKey::new(1, 0).0);
298
const_assert!(AwaitedActionSortKey::new(1, 0).0 > AwaitedActionSortKey::new(0, 0).0);
299
const_assert!(AwaitedActionSortKey::new(0, 0).0 > AwaitedActionSortKey::new(-1, 0).0);
300
const_assert!(AwaitedActionSortKey::new(-1, 0).0 > AwaitedActionSortKey::new(i32::MIN + 1, 0).0);
301
const_assert!(
302
    AwaitedActionSortKey::new(i32::MIN + 1, 0).0 > AwaitedActionSortKey::new(i32::MIN, 0).0
303
);
304
305
// Ensure the insert timestamp is used as the sort key second.
306
const_assert!(AwaitedActionSortKey::new(0, u32::MIN).0 > AwaitedActionSortKey::new(0, u32::MAX).0);