Coverage Report

Created: 2026-03-06 12:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/worker.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::hash::{Hash, Hasher};
16
use std::collections::{HashMap, HashSet};
17
use std::sync::Arc;
18
use std::time::{SystemTime, UNIX_EPOCH};
19
20
use nativelink_error::{Code, Error, ResultExt, make_err};
21
use nativelink_metric::MetricsComponent;
22
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
23
    ConnectionResult, StartExecute, UpdateForWorker, update_for_worker,
24
};
25
use nativelink_util::action_messages::{ActionInfo, OperationId, WorkerId};
26
use nativelink_util::metrics_utils::{AsyncCounterWrapper, CounterWithTime, FuncCounterWrapper};
27
use nativelink_util::platform_properties::{PlatformProperties, PlatformPropertyValue};
28
use tokio::sync::mpsc::UnboundedSender;
29
30
pub type WorkerTimestamp = u64;
31
32
/// Represents the action info and the platform properties of the action.
33
/// These platform properties have the type of the properties as well as
34
/// the value of the properties, unlike `ActionInfo`, which only has the
35
/// string value of the properties.
36
#[derive(Clone, Debug, MetricsComponent)]
37
pub struct ActionInfoWithProps {
38
    /// The action info of the action.
39
    #[metric(group = "action_info")]
40
    pub inner: Arc<ActionInfo>,
41
    /// The platform properties of the action.
42
    #[metric(group = "platform_properties")]
43
    pub platform_properties: PlatformProperties,
44
}
45
46
/// Notifications to send worker about a requested state change.
47
#[derive(Debug)]
48
pub enum WorkerUpdate {
49
    /// Requests that the worker begin executing this action.
50
    RunAction((OperationId, ActionInfoWithProps)),
51
52
    /// Request that the worker is no longer in the pool and may discard any jobs.
53
    Disconnect,
54
}
55
56
#[derive(Debug, MetricsComponent)]
57
pub struct PendingActionInfoData {
58
    #[metric]
59
    pub action_info: ActionInfoWithProps,
60
}
61
62
/// Represents a connection to a worker and used as the medium to
63
/// interact with the worker from the client/scheduler.
64
#[derive(Debug, MetricsComponent)]
65
pub struct Worker {
66
    /// Unique identifier of the worker.
67
    #[metric(help = "The unique identifier of the worker.")]
68
    pub id: WorkerId,
69
70
    /// Properties that describe the capabilities of this worker.
71
    #[metric(group = "platform_properties")]
72
    pub platform_properties: PlatformProperties,
73
74
    /// Channel to send commands from scheduler to worker.
75
    pub tx: UnboundedSender<UpdateForWorker>,
76
77
    /// The action info of the running actions on the worker.
78
    #[metric(group = "running_action_infos")]
79
    pub running_action_infos: HashMap<OperationId, PendingActionInfoData>,
80
81
    /// If the properties were restored already then it's added to this set.
82
    pub restored_platform_properties: HashSet<OperationId>,
83
84
    /// Timestamp of last time this worker had been communicated with.
85
    // Warning: Do not update this timestamp without updating the placement of the worker in
86
    // the LRUCache in the Workers struct.
87
    #[metric(help = "Last time this worker was communicated with.")]
88
    pub last_update_timestamp: WorkerTimestamp,
89
90
    /// Whether the worker rejected the last action due to back pressure.
91
    #[metric(help = "If the worker is paused.")]
92
    pub is_paused: bool,
93
94
    /// Whether the worker is draining.
95
    #[metric(help = "If the worker is draining.")]
96
    pub is_draining: bool,
97
98
    /// Maximum inflight tasks for this worker (or 0 for unlimited)
99
    #[metric(help = "Maximum inflight tasks for this worker (or 0 for unlimited)")]
100
    pub max_inflight_tasks: u64,
101
102
    /// Stats about the worker.
103
    #[metric]
104
    metrics: Arc<Metrics>,
105
}
106
107
81
fn send_msg_to_worker(
108
81
    tx: &UnboundedSender<UpdateForWorker>,
109
81
    msg: update_for_worker::Update,
110
81
) -> Result<(), Error> {
111
81
    tx.send(UpdateForWorker { update: Some(msg) })
112
81
        .map_err(|_| make_err!(
Code::Internal7
, "Worker disconnected"))
113
81
}
114
115
/// Reduces the platform properties available on the worker based on the platform properties provided.
116
/// This is used because we allow more than 1 job to run on a worker at a time, and this is how the
117
/// scheduler knows if more jobs can run on a given worker.
118
33
fn reduce_platform_properties(
119
33
    parent_props: &mut PlatformProperties,
120
33
    reduction_props: &PlatformProperties,
121
33
) {
122
33
    debug_assert!(
reduction_props0
.
is_satisfied_by0
(
parent_props0
, false));
123
37
    for (
property4
,
prop_value4
) in &reduction_props.properties {
124
4
        if let PlatformPropertyValue::Minimum(
value3
) = prop_value {
  Branch (124:16): [True: 3, False: 1]
  Branch (124:16): [Folded - Ignored]
125
3
            let worker_props = &mut parent_props.properties;
126
3
            if let &mut PlatformPropertyValue::Minimum(worker_value) =
  Branch (126:20): [True: 3, False: 0]
  Branch (126:20): [Folded - Ignored]
127
3
                &mut worker_props.get_mut(property).unwrap()
128
3
            {
129
3
                *worker_value -= value;
130
3
            
}0
131
1
        }
132
    }
133
33
}
134
135
impl Worker {
136
37
    pub fn new(
137
37
        id: WorkerId,
138
37
        platform_properties: PlatformProperties,
139
37
        tx: UnboundedSender<UpdateForWorker>,
140
37
        timestamp: WorkerTimestamp,
141
37
        max_inflight_tasks: u64,
142
37
    ) -> Self {
143
37
        Self {
144
37
            id,
145
37
            platform_properties,
146
37
            tx,
147
37
            running_action_infos: HashMap::new(),
148
37
            restored_platform_properties: HashSet::new(),
149
37
            last_update_timestamp: timestamp,
150
37
            is_paused: false,
151
37
            is_draining: false,
152
37
            max_inflight_tasks,
153
37
            metrics: Arc::new(Metrics {
154
37
                connected_timestamp: SystemTime::now()
155
37
                    .duration_since(UNIX_EPOCH)
156
37
                    .unwrap()
157
37
                    .as_secs(),
158
37
                actions_completed: CounterWithTime::default(),
159
37
                run_action: AsyncCounterWrapper::default(),
160
37
                keep_alive: FuncCounterWrapper::default(),
161
37
                notify_disconnect: CounterWithTime::default(),
162
37
            }),
163
37
        }
164
37
    }
165
166
    /// Sends the initial connection information to the worker. This generally is just meta info.
167
    /// This should only be sent once and should always be the first item in the stream.
168
37
    pub fn send_initial_connection_result(&mut self) -> Result<(), Error> {
169
37
        send_msg_to_worker(
170
37
            &self.tx,
171
37
            update_for_worker::Update::ConnectionResult(ConnectionResult {
172
37
                worker_id: self.id.clone().into(),
173
37
            }),
174
        )
175
37
        .err_tip(|| format!(
"Failed to send ConnectionResult to worker : {}"0
, self.id))
176
37
    }
177
178
    /// Notifies the worker of a requested state change.
179
43
    pub async fn notify_update(&mut self, worker_update: WorkerUpdate) -> Result<(), Error> {
180
43
        match worker_update {
181
33
            WorkerUpdate::RunAction((operation_id, action_info)) => {
182
33
                self.run_action(operation_id, action_info).await
183
            }
184
            WorkerUpdate::Disconnect => {
185
10
                self.metrics.notify_disconnect.inc();
186
10
                send_msg_to_worker(&self.tx, update_for_worker::Update::Disconnect(()))
187
            }
188
        }
189
43
    }
190
191
1
    pub fn keep_alive(&mut self) -> Result<(), Error> {
192
1
        let tx = &mut self.tx;
193
1
        let id = &self.id;
194
1
        self.metrics.keep_alive.wrap(move || {
195
1
            send_msg_to_worker(tx, update_for_worker::Update::KeepAlive(()))
196
1
                .err_tip(|| format!(
"Failed to send KeepAlive to worker : {id}"0
))
197
1
        })
198
1
    }
199
200
33
    async fn run_action(
201
33
        &mut self,
202
33
        operation_id: OperationId,
203
33
        action_info: ActionInfoWithProps,
204
33
    ) -> Result<(), Error> {
205
33
        let tx = &mut self.tx;
206
33
        let worker_platform_properties = &mut self.platform_properties;
207
33
        let running_action_infos = &mut self.running_action_infos;
208
33
        let worker_id = self.id.clone().into();
209
33
        self.metrics
210
33
            .run_action
211
33
            .wrap(async move {
212
33
                let action_info_clone = action_info.clone();
213
33
                let operation_id_string = operation_id.to_string();
214
33
                let start_execute = StartExecute {
215
33
                    execute_request: Some(action_info_clone.inner.as_ref().into()),
216
33
                    operation_id: operation_id_string,
217
33
                    queued_timestamp: Some(action_info.inner.insert_timestamp.into()),
218
33
                    platform: Some((&action_info.platform_properties).into()),
219
33
                    worker_id,
220
33
                };
221
33
                reduce_platform_properties(
222
33
                    worker_platform_properties,
223
33
                    &action_info.platform_properties,
224
                );
225
33
                running_action_infos.insert(operation_id, PendingActionInfoData { action_info });
226
227
33
                send_msg_to_worker(tx, update_for_worker::Update::StartAction(start_execute))
228
33
            })
229
33
            .await
230
33
    }
231
232
0
    pub(crate) fn execution_complete(&mut self, operation_id: &OperationId) {
233
0
        if let Some((operation_id, pending_action_info)) =
  Branch (233:16): [True: 0, False: 0]
  Branch (233:16): [Folded - Ignored]
234
0
            self.running_action_infos.remove_entry(operation_id)
235
0
        {
236
0
            self.restored_platform_properties
237
0
                .insert(operation_id.clone());
238
0
            self.restore_platform_properties(&pending_action_info.action_info.platform_properties);
239
0
            self.running_action_infos
240
0
                .insert(operation_id, pending_action_info);
241
0
        }
242
0
    }
243
244
8
    pub(crate) async fn complete_action(
245
8
        &mut self,
246
8
        operation_id: &OperationId,
247
8
    ) -> Result<(), Error> {
248
8
        let pending_action_info = self.running_action_infos.remove(operation_id).err_tip(|| 
{0
249
0
            format!(
250
0
                "Worker {} tried to complete operation {} that was not running",
251
                self.id, operation_id
252
            )
253
0
        })?;
254
8
        if !self.restored_platform_properties.remove(operation_id) {
  Branch (254:12): [True: 8, False: 0]
  Branch (254:12): [Folded - Ignored]
255
8
            self.restore_platform_properties(&pending_action_info.action_info.platform_properties);
256
8
        
}0
257
8
        self.is_paused = false;
258
8
        self.metrics.actions_completed.inc();
259
8
        Ok(())
260
8
    }
261
262
0
    pub fn has_actions(&self) -> bool {
263
0
        !self.running_action_infos.is_empty()
264
0
    }
265
266
8
    fn restore_platform_properties(&mut self, props: &PlatformProperties) {
267
10
        for (
property2
,
prop_value2
) in &props.properties {
268
2
            if let PlatformPropertyValue::Minimum(value) = prop_value {
  Branch (268:20): [True: 2, False: 0]
  Branch (268:20): [Folded - Ignored]
269
2
                let worker_props = &mut self.platform_properties.properties;
270
2
                if let PlatformPropertyValue::Minimum(worker_value) =
  Branch (270:24): [True: 2, False: 0]
  Branch (270:24): [Folded - Ignored]
271
2
                    worker_props.get_mut(property).unwrap()
272
2
                {
273
2
                    *worker_value += value;
274
2
                
}0
275
0
            }
276
        }
277
8
    }
278
279
95
    pub fn can_accept_work(&self) -> bool {
280
95
        !self.is_paused
  Branch (280:9): [True: 95, False: 0]
  Branch (280:9): [Folded - Ignored]
281
95
            && !self.is_draining
  Branch (281:16): [True: 94, False: 1]
  Branch (281:16): [Folded - Ignored]
282
94
            && (self.max_inflight_tasks == 0
  Branch (282:17): [True: 91, False: 3]
  Branch (282:17): [Folded - Ignored]
283
3
                || u64::try_from(self.running_action_infos.len()).unwrap_or(u64::MAX)
284
3
                    < self.max_inflight_tasks)
285
95
    }
286
}
287
288
impl PartialEq for Worker {
289
0
    fn eq(&self, other: &Self) -> bool {
290
0
        self.id == other.id
291
0
    }
292
}
293
294
impl Eq for Worker {}
295
296
impl Hash for Worker {
297
0
    fn hash<H: Hasher>(&self, state: &mut H) {
298
0
        self.id.hash(state);
299
0
    }
300
}
301
302
#[derive(Debug, Default, MetricsComponent)]
303
struct Metrics {
304
    #[metric(help = "The timestamp of when this worker connected.")]
305
    connected_timestamp: u64,
306
    #[metric(help = "The number of actions completed for this worker.")]
307
    actions_completed: CounterWithTime,
308
    #[metric(help = "The number of actions started for this worker.")]
309
    run_action: AsyncCounterWrapper,
310
    #[metric(help = "The number of keep_alive sent to this worker.")]
311
    keep_alive: FuncCounterWrapper,
312
    #[metric(help = "The number of notify_disconnect sent to this worker.")]
313
    notify_disconnect: CounterWithTime,
314
}