Coverage Report

Created: 2026-06-12 23:36

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/worker.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::hash::{Hash, Hasher};
16
use std::collections::{HashMap, HashSet};
17
use std::sync::Arc;
18
use std::time::{SystemTime, UNIX_EPOCH};
19
20
use nativelink_error::{Code, Error, ResultExt};
21
use nativelink_metric::MetricsComponent;
22
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
23
    ConnectionResult, StartExecute, UpdateForWorker, update_for_worker,
24
};
25
use nativelink_util::action_messages::{ActionInfo, OperationId, WorkerId};
26
use nativelink_util::metrics_utils::{AsyncCounterWrapper, CounterWithTime, FuncCounterWrapper};
27
use nativelink_util::origin_event::OriginMetadata;
28
use nativelink_util::platform_properties::{PlatformProperties, PlatformPropertyValue};
29
use tokio::sync::mpsc::UnboundedSender;
30
31
pub type WorkerTimestamp = u64;
32
33
/// Represents the action info and the platform properties of the action.
34
/// These platform properties have the type of the properties as well as
35
/// the value of the properties, unlike `ActionInfo`, which only has the
36
/// string value of the properties.
37
#[derive(Clone, Debug, MetricsComponent)]
38
pub struct ActionInfoWithProps {
39
    /// The action info of the action.
40
    #[metric(group = "action_info")]
41
    pub inner: Arc<ActionInfo>,
42
    /// The platform properties of the action.
43
    #[metric(group = "platform_properties")]
44
    pub platform_properties: PlatformProperties,
45
    /// Origin metadata used when publishing scheduler-side telemetry for this action.
46
    pub origin_metadata: OriginMetadata,
47
    /// `OriginEvent` id for the `scheduler_start_execute` request.
48
    pub scheduler_start_execute_event_id: Option<String>,
49
}
50
51
/// Notifications to send worker about a requested state change.
52
#[derive(Debug)]
53
pub enum WorkerUpdate {
54
    /// Requests that the worker begin executing this action.
55
    RunAction(Box<(OperationId, ActionInfoWithProps)>),
56
57
    /// Request that the worker is no longer in the pool and may discard any jobs.
58
    Disconnect,
59
}
60
61
#[derive(Debug, MetricsComponent)]
62
pub struct PendingActionInfoData {
63
    #[metric]
64
    pub action_info: ActionInfoWithProps,
65
}
66
67
/// Represents a connection to a worker and used as the medium to
68
/// interact with the worker from the client/scheduler.
69
#[derive(Debug, MetricsComponent)]
70
pub struct Worker {
71
    /// Unique identifier of the worker.
72
    #[metric(help = "The unique identifier of the worker.")]
73
    pub id: WorkerId,
74
75
    /// Properties that describe the capabilities of this worker.
76
    #[metric(group = "platform_properties")]
77
    pub platform_properties: PlatformProperties,
78
79
    /// Channel to send commands from scheduler to worker.
80
    pub tx: UnboundedSender<UpdateForWorker>,
81
82
    /// The action info of the running actions on the worker.
83
    #[metric(group = "running_action_infos")]
84
    pub running_action_infos: HashMap<OperationId, PendingActionInfoData>,
85
86
    /// If the properties were restored already then it's added to this set.
87
    pub restored_platform_properties: HashSet<OperationId>,
88
89
    /// Timestamp of last time this worker had been communicated with.
90
    // Warning: Do not update this timestamp without updating the placement of the worker in
91
    // the LRUCache in the Workers struct.
92
    #[metric(help = "Last time this worker was communicated with.")]
93
    pub last_update_timestamp: WorkerTimestamp,
94
95
    /// Whether the worker rejected the last action due to back pressure.
96
    #[metric(help = "If the worker is paused.")]
97
    pub is_paused: bool,
98
99
    /// Whether the worker is draining.
100
    #[metric(help = "If the worker is draining.")]
101
    pub is_draining: bool,
102
103
    /// Maximum inflight tasks for this worker (or 0 for unlimited)
104
    #[metric(help = "Maximum inflight tasks for this worker (or 0 for unlimited)")]
105
    pub max_inflight_tasks: u64,
106
107
    /// Stats about the worker.
108
    #[metric]
109
    metrics: Arc<Metrics>,
110
}
111
112
87
fn send_msg_to_worker(
113
87
    tx: &UnboundedSender<UpdateForWorker>,
114
87
    msg: update_for_worker::Update,
115
87
) -> Result<(), Error> {
116
87
    tx.send(UpdateForWorker { update: Some(msg) })
117
87
        .map_err(|err| 
Error::from_std_err7
(
Code::Internal7
,
&err7
).
append7
("Worker disconnected"))
118
87
}
119
120
/// Reduces the platform properties available on the worker based on the platform properties provided.
121
/// This is used because we allow more than 1 job to run on a worker at a time, and this is how the
122
/// scheduler knows if more jobs can run on a given worker.
123
36
fn reduce_platform_properties(
124
36
    parent_props: &mut PlatformProperties,
125
36
    reduction_props: &PlatformProperties,
126
36
) {
127
36
    debug_assert!(
reduction_props0
.
is_satisfied_by0
(
parent_props0
, false));
128
36
    for (
property6
,
prop_value6
) in &reduction_props.properties {
129
6
        if let PlatformPropertyValue::Minimum(
value5
) = prop_value {
130
5
            let worker_props = &mut parent_props.properties;
131
5
            if let &mut PlatformPropertyValue::Minimum(worker_value) =
132
5
                &mut worker_props.get_mut(property).unwrap()
133
5
            {
134
5
                *worker_value -= value;
135
5
            
}0
136
1
        }
137
    }
138
36
}
139
140
impl Worker {
141
40
    pub fn new(
142
40
        id: WorkerId,
143
40
        platform_properties: PlatformProperties,
144
40
        tx: UnboundedSender<UpdateForWorker>,
145
40
        timestamp: WorkerTimestamp,
146
40
        max_inflight_tasks: u64,
147
40
    ) -> Self {
148
40
        Self {
149
40
            id,
150
40
            platform_properties,
151
40
            tx,
152
40
            running_action_infos: HashMap::new(),
153
40
            restored_platform_properties: HashSet::new(),
154
40
            last_update_timestamp: timestamp,
155
40
            is_paused: false,
156
40
            is_draining: false,
157
40
            max_inflight_tasks,
158
40
            metrics: Arc::new(Metrics {
159
40
                connected_timestamp: SystemTime::now()
160
40
                    .duration_since(UNIX_EPOCH)
161
40
                    .unwrap()
162
40
                    .as_secs(),
163
40
                actions_completed: CounterWithTime::default(),
164
40
                run_action: AsyncCounterWrapper::default(),
165
40
                keep_alive: FuncCounterWrapper::default(),
166
40
                notify_disconnect: CounterWithTime::default(),
167
40
            }),
168
40
        }
169
40
    }
170
171
    /// Sends the initial connection information to the worker. This generally is just meta info.
172
    /// This should only be sent once and should always be the first item in the stream.
173
40
    pub fn send_initial_connection_result(&mut self) -> Result<(), Error> {
174
40
        send_msg_to_worker(
175
40
            &self.tx,
176
40
            update_for_worker::Update::ConnectionResult(ConnectionResult {
177
40
                worker_id: self.id.clone().into(),
178
40
            }),
179
        )
180
40
        .err_tip(|| 
format!0
("Failed to send ConnectionResult to worker : {}", self.id))
181
40
    }
182
183
    /// Notifies the worker of a requested state change.
184
46
    pub async fn notify_update(&mut self, worker_update: WorkerUpdate) -> Result<(), Error> {
185
46
        match worker_update {
186
36
            WorkerUpdate::RunAction(action) => {
187
36
                let (operation_id, action_info) = *action;
188
36
                self.run_action(operation_id, action_info).await
189
            }
190
            WorkerUpdate::Disconnect => {
191
10
                self.metrics.notify_disconnect.inc();
192
10
                send_msg_to_worker(&self.tx, update_for_worker::Update::Disconnect(()))
193
            }
194
        }
195
46
    }
196
197
1
    pub fn keep_alive(&mut self) -> Result<(), Error> {
198
1
        let tx = &mut self.tx;
199
1
        let id = &self.id;
200
1
        self.metrics.keep_alive.wrap(move || {
201
1
            send_msg_to_worker(tx, update_for_worker::Update::KeepAlive(()))
202
1
                .err_tip(|| 
format!0
("Failed to send KeepAlive to worker : {id}"))
203
1
        })
204
1
    }
205
206
36
    async fn run_action(
207
36
        &mut self,
208
36
        operation_id: OperationId,
209
36
        action_info: ActionInfoWithProps,
210
36
    ) -> Result<(), Error> {
211
36
        let tx = &mut self.tx;
212
36
        let worker_platform_properties = &mut self.platform_properties;
213
36
        let running_action_infos = &mut self.running_action_infos;
214
36
        let worker_id = self.id.clone().into();
215
36
        self.metrics
216
36
            .run_action
217
36
            .wrap(async move {
218
36
                let action_info_clone = action_info.clone();
219
36
                let operation_id_string = operation_id.to_string();
220
36
                let start_execute = StartExecute {
221
36
                    execute_request: Some(action_info_clone.inner.as_ref().into()),
222
36
                    operation_id: operation_id_string,
223
36
                    queued_timestamp: Some(action_info.inner.insert_timestamp.into()),
224
36
                    platform: Some((&action_info.platform_properties).into()),
225
36
                    worker_id,
226
36
                };
227
36
                reduce_platform_properties(
228
36
                    worker_platform_properties,
229
36
                    &action_info.platform_properties,
230
                );
231
36
                running_action_infos.insert(operation_id, PendingActionInfoData { action_info });
232
233
36
                send_msg_to_worker(tx, update_for_worker::Update::StartAction(start_execute))
234
36
            })
235
36
            .await
236
36
    }
237
238
0
    pub(crate) fn execution_complete(&mut self, operation_id: &OperationId) {
239
0
        if let Some((operation_id, pending_action_info)) =
240
0
            self.running_action_infos.remove_entry(operation_id)
241
0
        {
242
0
            self.restored_platform_properties
243
0
                .insert(operation_id.clone());
244
0
            self.restore_platform_properties(&pending_action_info.action_info.platform_properties);
245
0
            self.running_action_infos
246
0
                .insert(operation_id, pending_action_info);
247
0
        }
248
0
    }
249
250
10
    pub(crate) async fn complete_action(
251
10
        &mut self,
252
10
        operation_id: &OperationId,
253
10
    ) -> Result<(), Error> {
254
10
        let pending_action_info = self.running_action_infos.remove(operation_id).err_tip(|| 
{0
255
0
            format!(
256
                "Worker {} tried to complete operation {} that was not running",
257
                self.id, operation_id
258
            )
259
0
        })?;
260
10
        if !self.restored_platform_properties.remove(operation_id) {
261
10
            self.restore_platform_properties(&pending_action_info.action_info.platform_properties);
262
10
        
}0
263
10
        self.is_paused = false;
264
10
        self.metrics.actions_completed.inc();
265
10
        Ok(())
266
10
    }
267
268
0
    pub fn has_actions(&self) -> bool {
269
0
        !self.running_action_infos.is_empty()
270
0
    }
271
272
10
    fn restore_platform_properties(&mut self, props: &PlatformProperties) {
273
10
        for (
property2
,
prop_value2
) in &props.properties {
274
2
            if let PlatformPropertyValue::Minimum(value) = prop_value {
275
2
                let worker_props = &mut self.platform_properties.properties;
276
2
                if let PlatformPropertyValue::Minimum(worker_value) =
277
2
                    worker_props.get_mut(property).unwrap()
278
2
                {
279
2
                    *worker_value += value;
280
2
                
}0
281
0
            }
282
        }
283
10
    }
284
285
103
    pub fn can_accept_work(&self) -> bool {
286
103
        !self.is_paused
287
103
            && !self.is_draining
288
102
            && (self.max_inflight_tasks == 0
289
3
                || u64::try_from(self.running_action_infos.len()).unwrap_or(u64::MAX)
290
3
                    < self.max_inflight_tasks)
291
103
    }
292
}
293
294
impl PartialEq for Worker {
295
0
    fn eq(&self, other: &Self) -> bool {
296
0
        self.id == other.id
297
0
    }
298
}
299
300
impl Eq for Worker {}
301
302
impl Hash for Worker {
303
0
    fn hash<H: Hasher>(&self, state: &mut H) {
304
0
        self.id.hash(state);
305
0
    }
306
}
307
308
#[derive(Debug, Default, MetricsComponent)]
309
struct Metrics {
310
    #[metric(help = "The timestamp of when this worker connected.")]
311
    connected_timestamp: u64,
312
    #[metric(help = "The number of actions completed for this worker.")]
313
    actions_completed: CounterWithTime,
314
    #[metric(help = "The number of actions started for this worker.")]
315
    run_action: AsyncCounterWrapper,
316
    #[metric(help = "The number of keep_alive sent to this worker.")]
317
    keep_alive: FuncCounterWrapper,
318
    #[metric(help = "The number of notify_disconnect sent to this worker.")]
319
    notify_disconnect: CounterWithTime,
320
}