Coverage Report

Created: 2025-10-30 00:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
use std::time::{SystemTime, UNIX_EPOCH};
17
18
use nativelink_error::{Error, ResultExt, make_input_err};
19
use nativelink_metric::{
20
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
21
};
22
use nativelink_util::action_messages::{
23
    ActionInfo, ActionStage, ActionState, OperationId, WorkerId,
24
};
25
use nativelink_util::origin_event::OriginMetadata;
26
use opentelemetry::baggage::BaggageExt;
27
use opentelemetry::context::Context;
28
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
29
use serde::{Deserialize, Serialize};
30
use static_assertions::{assert_eq_size, const_assert, const_assert_eq};
31
32
/// The version of the awaited action.
33
/// This number will always increment by one each time
34
/// the action is updated.
35
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
36
struct AwaitedActionVersion(i64);
37
38
impl MetricsComponent for AwaitedActionVersion {
39
0
    fn publish(
40
0
        &self,
41
0
        _kind: MetricKind,
42
0
        _field_metadata: MetricFieldData,
43
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
44
0
        Ok(MetricPublishKnownKindData::Counter(u64::from_ne_bytes(
45
0
            self.0.to_ne_bytes(),
46
0
        )))
47
0
    }
48
}
49
50
/// An action that is being awaited on and last known state.
51
#[derive(Debug, Clone, MetricsComponent, Serialize, Deserialize)]
52
pub struct AwaitedAction {
53
    /// The current version of the action.
54
    #[metric(help = "The version of the AwaitedAction")]
55
    version: AwaitedActionVersion,
56
57
    /// The action that is being awaited on.
58
    #[metric(help = "The action info of the AwaitedAction")]
59
    action_info: Arc<ActionInfo>,
60
61
    /// The operation id of the action.
62
    // If you need the client operation id, it may be set in
63
    // ActionState::operation_id.
64
    #[metric(help = "The operation id of the AwaitedAction")]
65
    operation_id: OperationId,
66
67
    /// The currentsort key used to order the actions.
68
    #[metric(help = "The sort key of the AwaitedAction")]
69
    sort_key: AwaitedActionSortKey,
70
71
    /// The time the action was last updated.
72
    #[metric(help = "The last time the worker updated the AwaitedAction")]
73
    last_worker_updated_timestamp: SystemTime,
74
75
    /// The last time the client sent a keepalive message.
76
    #[metric(help = "The last time the client sent a keepalive message")]
77
    last_client_keepalive_timestamp: SystemTime,
78
79
    /// Worker that is currently running this action, None if unassigned.
80
    #[metric(help = "The worker id of the AwaitedAction")]
81
    worker_id: Option<WorkerId>,
82
83
    /// The current state of the action.
84
    #[metric(help = "The state of the AwaitedAction")]
85
    state: Arc<ActionState>,
86
87
    /// The origin metadata of the action.
88
    maybe_origin_metadata: Option<OriginMetadata>,
89
90
    /// Number of attempts the job has been tried.
91
    #[metric(help = "The number of attempts the AwaitedAction has been tried")]
92
    pub attempts: usize,
93
}
94
95
impl AwaitedAction {
96
37
    pub fn new(operation_id: OperationId, action_info: Arc<ActionInfo>, now: SystemTime) -> Self {
97
37
        let sort_key = AwaitedActionSortKey::new_with_unique_key(
98
37
            action_info.priority,
99
37
            &action_info.insert_timestamp,
100
        );
101
37
        let action_state = Arc::new(ActionState {
102
37
            stage: ActionStage::Queued,
103
37
            // Note: We don't use the real client_operation_id here because
104
37
            // the only place AwaitedAction::new should ever be called is
105
37
            // when the action is first created and this struct will be stored
106
37
            // in the database, so we don't want to accidentally leak the
107
37
            // client_operation_id to all clients.
108
37
            client_operation_id: operation_id.clone(),
109
37
            action_digest: action_info.unique_qualifier.digest(),
110
37
        });
111
112
37
        let ctx = Context::current();
113
37
        let baggage = ctx.baggage();
114
115
37
        let maybe_origin_metadata = if baggage.is_empty() {
  Branch (115:40): [True: 33, False: 4]
  Branch (115:40): [Folded - Ignored]
116
33
            None
117
        } else {
118
            Some(OriginMetadata {
119
4
                identity: baggage
120
4
                    .get(ENDUSER_ID)
121
4
                    .map(|v| v.as_str().to_string())
122
4
                    .unwrap_or_default(),
123
4
                bazel_metadata: None, // TODO(palfrey): Implement conversion.
124
            })
125
        };
126
127
37
        Self {
128
37
            version: AwaitedActionVersion(0),
129
37
            action_info,
130
37
            operation_id,
131
37
            sort_key,
132
37
            attempts: 0,
133
37
            last_worker_updated_timestamp: now,
134
37
            last_client_keepalive_timestamp: now,
135
37
            maybe_origin_metadata,
136
37
            worker_id: None,
137
37
            state: action_state,
138
37
        }
139
37
    }
140
141
97
    pub(crate) const fn version(&self) -> i64 {
142
97
        self.version.0
143
97
    }
144
145
41
    pub(crate) const fn set_version(&mut self, version: i64) {
146
41
        self.version = AwaitedActionVersion(version);
147
41
    }
148
149
39
    pub(crate) const fn increment_version(&mut self) {
150
39
        self.version = AwaitedActionVersion(self.version.0 + 1);
151
39
    }
152
153
202
    pub const fn action_info(&self) -> &Arc<ActionInfo> {
154
202
        &self.action_info
155
202
    }
156
157
136
    pub const fn operation_id(&self) -> &OperationId {
158
136
        &self.operation_id
159
136
    }
160
161
78
    pub(crate) const fn sort_key(&self) -> AwaitedActionSortKey {
162
78
        self.sort_key
163
78
    }
164
165
938
    pub const fn state(&self) -> &Arc<ActionState> {
166
938
        &self.state
167
938
    }
168
169
134
    pub(crate) const fn maybe_origin_metadata(&self) -> Option<&OriginMetadata> {
170
134
        self.maybe_origin_metadata.as_ref()
171
134
    }
172
173
65
    pub(crate) const fn worker_id(&self) -> Option<&WorkerId> {
174
65
        self.worker_id.as_ref()
175
65
    }
176
177
555
    pub(crate) const fn last_worker_updated_timestamp(&self) -> SystemTime {
178
555
        self.last_worker_updated_timestamp
179
555
    }
180
181
92
    pub(crate) const fn worker_keep_alive(&mut self, now: SystemTime) {
182
92
        self.last_worker_updated_timestamp = now;
183
92
    }
184
185
586
    pub(crate) const fn last_client_keepalive_timestamp(&self) -> SystemTime {
186
586
        self.last_client_keepalive_timestamp
187
586
    }
188
189
61
    pub(crate) const fn update_client_keep_alive(&mut self, now: SystemTime) {
190
61
        self.last_client_keepalive_timestamp = now;
191
61
    }
192
193
70
    pub(crate) fn set_client_operation_id(&mut self, client_operation_id: OperationId) {
194
70
        Arc::make_mut(&mut self.state).client_operation_id = client_operation_id;
195
70
    }
196
197
    /// Sets the worker id that is currently processing this action.
198
49
    pub(crate) fn set_worker_id(&mut self, new_maybe_worker_id: Option<WorkerId>, now: SystemTime) {
199
49
        if self.worker_id != new_maybe_worker_id {
  Branch (199:12): [True: 42, False: 7]
  Branch (199:12): [Folded - Ignored]
200
42
            self.worker_id = new_maybe_worker_id;
201
42
            self.worker_keep_alive(now);
202
42
        
}7
203
49
    }
204
205
    /// Sets the current state of the action and updates the last worker updated timestamp.
206
50
    pub fn worker_set_state(&mut self, mut state: Arc<ActionState>, now: SystemTime) {
207
50
        core::mem::swap(&mut self.state, &mut state);
208
50
        self.worker_keep_alive(now);
209
50
    }
210
}
211
212
impl TryFrom<&[u8]> for AwaitedAction {
213
    type Error = Error;
214
0
    fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
215
0
        serde_json::from_slice(value)
216
0
            .map_err(|e| make_input_err!("{}", e.to_string()))
217
0
            .err_tip(|| "In AwaitedAction::TryFrom::&[u8]")
218
0
    }
219
}
220
221
/// The key used to sort the awaited actions.
222
///
223
/// The rules for sorting are as follows:
224
/// 1. priority of the action
225
/// 2. insert order of the action (lower = higher priority)
226
/// 3. (mostly random hash based on the action info)
227
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
228
#[repr(transparent)]
229
pub struct AwaitedActionSortKey(u64);
230
231
impl MetricsComponent for AwaitedActionSortKey {
232
0
    fn publish(
233
0
        &self,
234
0
        _kind: MetricKind,
235
0
        _field_metadata: MetricFieldData,
236
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
237
0
        Ok(MetricPublishKnownKindData::Counter(self.0))
238
0
    }
239
}
240
241
impl AwaitedActionSortKey {
242
37
    const fn new(priority: i32, insert_timestamp: u32) -> Self {
243
        // Shift the signed i32 range [i32::MIN, i32::MAX] to the unsigned u32 range
244
        // [0, u32::MAX] to preserve ordering when we convert to bytes for sorting.
245
37
        let priority_u32 = i32::MIN.unsigned_abs().wrapping_add_signed(priority);
246
37
        let priority = priority_u32.to_be_bytes();
247
248
        // Invert our timestamp so the larger the timestamp the lower the number.
249
        // This makes timestamp descending order instead of ascending.
250
37
        let timestamp = (insert_timestamp ^ u32::MAX).to_be_bytes();
251
252
37
        Self(u64::from_be_bytes([
253
37
            priority[0],
254
37
            priority[1],
255
37
            priority[2],
256
37
            priority[3],
257
37
            timestamp[0],
258
37
            timestamp[1],
259
37
            timestamp[2],
260
37
            timestamp[3],
261
37
        ]))
262
37
    }
263
264
37
    fn new_with_unique_key(priority: i32, insert_timestamp: &SystemTime) -> Self {
265
37
        let timestamp = u32::try_from(
266
37
            insert_timestamp
267
37
                .duration_since(UNIX_EPOCH)
268
37
                .unwrap()
269
37
                .as_secs(),
270
        )
271
37
        .unwrap_or(u32::MAX);
272
37
        Self::new(priority, timestamp)
273
37
    }
274
275
15
    pub(crate) const fn as_u64(self) -> u64 {
276
15
        self.0
277
15
    }
278
}
279
280
// Ensure the size of the sort key is the same as a `u64`.
281
assert_eq_size!(AwaitedActionSortKey, u64);
282
283
const_assert_eq!(
284
    AwaitedActionSortKey::new(0x1234_5678, 0x9abc_def0).0,
285
    // Note: Result has 0x12345678 + 0x80000000 = 0x92345678 because we need
286
    // to shift the `i32::MIN` value to be represented by zero.
287
    // Note: `6543210f` are the inverted bits of `9abcdef0`.
288
    // This effectively inverts the priority to now have the highest priority
289
    // be the lowest timestamps.
290
    AwaitedActionSortKey(0x9234_5678_6543_210f).0
291
);
292
// Ensure the priority is used as the sort key first.
293
const_assert!(
294
    AwaitedActionSortKey::new(i32::MAX, 0).0 > AwaitedActionSortKey::new(i32::MAX - 1, 0).0
295
);
296
const_assert!(AwaitedActionSortKey::new(i32::MAX - 1, 0).0 > AwaitedActionSortKey::new(1, 0).0);
297
const_assert!(AwaitedActionSortKey::new(1, 0).0 > AwaitedActionSortKey::new(0, 0).0);
298
const_assert!(AwaitedActionSortKey::new(0, 0).0 > AwaitedActionSortKey::new(-1, 0).0);
299
const_assert!(AwaitedActionSortKey::new(-1, 0).0 > AwaitedActionSortKey::new(i32::MIN + 1, 0).0);
300
const_assert!(
301
    AwaitedActionSortKey::new(i32::MIN + 1, 0).0 > AwaitedActionSortKey::new(i32::MIN, 0).0
302
);
303
304
// Ensure the insert timestamp is used as the sort key second.
305
const_assert!(AwaitedActionSortKey::new(0, u32::MIN).0 > AwaitedActionSortKey::new(0, u32::MAX).0);