Coverage Report

Created: 2025-05-30 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/worker.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::hash::{Hash, Hasher};
16
use std::collections::HashMap;
17
use std::sync::Arc;
18
use std::time::{SystemTime, UNIX_EPOCH};
19
20
use nativelink_error::{Code, Error, ResultExt, make_err};
21
use nativelink_metric::MetricsComponent;
22
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
23
    ConnectionResult, StartExecute, UpdateForWorker, update_for_worker,
24
};
25
use nativelink_util::action_messages::{ActionInfo, OperationId, WorkerId};
26
use nativelink_util::metrics_utils::{AsyncCounterWrapper, CounterWithTime, FuncCounterWrapper};
27
use nativelink_util::platform_properties::{PlatformProperties, PlatformPropertyValue};
28
use tokio::sync::mpsc::UnboundedSender;
29
30
pub type WorkerTimestamp = u64;
31
32
/// Represents the action info and the platform properties of the action.
33
/// These platform properties have the type of the properties as well as
34
/// the value of the properties, unlike `ActionInfo`, which only has the
35
/// string value of the properties.
36
#[derive(Clone, Debug, MetricsComponent)]
37
pub struct ActionInfoWithProps {
38
    /// The action info of the action.
39
    #[metric(group = "action_info")]
40
    pub inner: Arc<ActionInfo>,
41
    /// The platform properties of the action.
42
    #[metric(group = "platform_properties")]
43
    pub platform_properties: PlatformProperties,
44
}
45
46
/// Notifications to send worker about a requested state change.
47
#[derive(Debug)]
48
pub enum WorkerUpdate {
49
    /// Requests that the worker begin executing this action.
50
    RunAction((OperationId, ActionInfoWithProps)),
51
52
    /// Request that the worker is no longer in the pool and may discard any jobs.
53
    Disconnect,
54
}
55
56
#[derive(Debug, MetricsComponent)]
57
pub struct PendingActionInfoData {
58
    #[metric]
59
    pub action_info: ActionInfoWithProps,
60
}
61
62
/// Represents a connection to a worker and used as the medium to
63
/// interact with the worker from the client/scheduler.
64
#[derive(Debug, MetricsComponent)]
65
pub struct Worker {
66
    /// Unique identifier of the worker.
67
    #[metric(help = "The unique identifier of the worker.")]
68
    pub id: WorkerId,
69
70
    /// Properties that describe the capabilities of this worker.
71
    #[metric(group = "platform_properties")]
72
    pub platform_properties: PlatformProperties,
73
74
    /// Channel to send commands from scheduler to worker.
75
    pub tx: UnboundedSender<UpdateForWorker>,
76
77
    /// The action info of the running actions on the worker.
78
    #[metric(group = "running_action_infos")]
79
    pub running_action_infos: HashMap<OperationId, PendingActionInfoData>,
80
81
    /// Timestamp of last time this worker had been communicated with.
82
    // Warning: Do not update this timestamp without updating the placement of the worker in
83
    // the LRUCache in the Workers struct.
84
    #[metric(help = "Last time this worker was communicated with.")]
85
    pub last_update_timestamp: WorkerTimestamp,
86
87
    /// Whether the worker rejected the last action due to back pressure.
88
    #[metric(help = "If the worker is paused.")]
89
    pub is_paused: bool,
90
91
    /// Whether the worker is draining.
92
    #[metric(help = "If the worker is draining.")]
93
    pub is_draining: bool,
94
95
    /// Stats about the worker.
96
    #[metric]
97
    metrics: Arc<Metrics>,
98
}
99
100
66
fn send_msg_to_worker(
101
66
    tx: &UnboundedSender<UpdateForWorker>,
102
66
    msg: update_for_worker::Update,
103
66
) -> Result<(), Error> {
104
66
    tx.send(UpdateForWorker { update: Some(msg) })
105
66
        .map_err(|_| make_err!(
Code::Internal3
, "Worker disconnected"))
106
66
}
107
108
/// Reduces the platform properties available on the worker based on the platform properties provided.
109
/// This is used because we allow more than 1 job to run on a worker at a time, and this is how the
110
/// scheduler knows if more jobs can run on a given worker.
111
28
fn reduce_platform_properties(
112
28
    parent_props: &mut PlatformProperties,
113
28
    reduction_props: &PlatformProperties,
114
28
) {
115
28
    debug_assert!(
reduction_props0
.
is_satisfied_by0
(
parent_props0
));
116
32
    for (
property4
,
prop_value4
) in &reduction_props.properties {
117
4
        if let PlatformPropertyValue::Minimum(
value3
) = prop_value {
  Branch (117:16): [True: 3, False: 1]
  Branch (117:16): [Folded - Ignored]
118
3
            let worker_props = &mut parent_props.properties;
119
3
            if let &mut PlatformPropertyValue::Minimum(worker_value) =
  Branch (119:20): [True: 3, False: 0]
  Branch (119:20): [Folded - Ignored]
120
3
                &mut worker_props.get_mut(property).unwrap()
121
3
            {
122
3
                *worker_value -= value;
123
3
            
}0
124
1
        }
125
    }
126
28
}
127
128
impl Worker {
129
31
    pub fn new(
130
31
        id: WorkerId,
131
31
        platform_properties: PlatformProperties,
132
31
        tx: UnboundedSender<UpdateForWorker>,
133
31
        timestamp: WorkerTimestamp,
134
31
    ) -> Self {
135
31
        Self {
136
31
            id,
137
31
            platform_properties,
138
31
            tx,
139
31
            running_action_infos: HashMap::new(),
140
31
            last_update_timestamp: timestamp,
141
31
            is_paused: false,
142
31
            is_draining: false,
143
31
            metrics: Arc::new(Metrics {
144
31
                connected_timestamp: SystemTime::now()
145
31
                    .duration_since(UNIX_EPOCH)
146
31
                    .unwrap()
147
31
                    .as_secs(),
148
31
                actions_completed: CounterWithTime::default(),
149
31
                run_action: AsyncCounterWrapper::default(),
150
31
                keep_alive: FuncCounterWrapper::default(),
151
31
                notify_disconnect: CounterWithTime::default(),
152
31
            }),
153
31
        }
154
31
    }
155
156
    /// Sends the initial connection information to the worker. This generally is just meta info.
157
    /// This should only be sent once and should always be the first item in the stream.
158
31
    pub fn send_initial_connection_result(&mut self) -> Result<(), Error> {
159
31
        send_msg_to_worker(
160
31
            &self.tx,
161
31
            update_for_worker::Update::ConnectionResult(ConnectionResult {
162
31
                worker_id: self.id.clone().into(),
163
31
            }),
164
        )
165
31
        .err_tip(|| format!(
"Failed to send ConnectionResult to worker : {}"0
, self.id))
166
31
    }
167
168
    /// Notifies the worker of a requested state change.
169
34
    pub async fn notify_update(&mut self, worker_update: WorkerUpdate) -> Result<(), Error> {
170
34
        match worker_update {
171
28
            WorkerUpdate::RunAction((operation_id, action_info)) => {
172
28
                self.run_action(operation_id, action_info).await
173
            }
174
            WorkerUpdate::Disconnect => {
175
6
                self.metrics.notify_disconnect.inc();
176
6
                send_msg_to_worker(&self.tx, update_for_worker::Update::Disconnect(()))
177
            }
178
        }
179
34
    }
180
181
1
    pub fn keep_alive(&mut self) -> Result<(), Error> {
182
1
        let tx = &mut self.tx;
183
1
        let id = &self.id;
184
1
        self.metrics.keep_alive.wrap(move || {
185
1
            send_msg_to_worker(tx, update_for_worker::Update::KeepAlive(()))
186
1
                .err_tip(|| format!(
"Failed to send KeepAlive to worker : {id}"0
))
187
1
        })
188
1
    }
189
190
28
    async fn run_action(
191
28
        &mut self,
192
28
        operation_id: OperationId,
193
28
        action_info: ActionInfoWithProps,
194
28
    ) -> Result<(), Error> {
195
28
        let tx = &mut self.tx;
196
28
        let worker_platform_properties = &mut self.platform_properties;
197
28
        let running_action_infos = &mut self.running_action_infos;
198
28
        let worker_id = self.id.clone().into();
199
28
        self.metrics
200
28
            .run_action
201
28
            .wrap(async move {
202
28
                let action_info_clone = action_info.clone();
203
28
                let operation_id_string = operation_id.to_string();
204
28
                let start_execute = StartExecute {
205
28
                    execute_request: Some(action_info_clone.inner.as_ref().into()),
206
28
                    operation_id: operation_id_string,
207
28
                    queued_timestamp: Some(action_info.inner.insert_timestamp.into()),
208
28
                    platform: Some((&action_info.platform_properties).into()),
209
28
                    worker_id,
210
28
                };
211
28
                reduce_platform_properties(
212
28
                    worker_platform_properties,
213
28
                    &action_info.platform_properties,
214
                );
215
28
                running_action_infos.insert(operation_id, PendingActionInfoData { action_info });
216
217
28
                send_msg_to_worker(tx, update_for_worker::Update::StartAction(start_execute))
218
28
            })
219
28
            .await
220
28
    }
221
222
9
    pub(crate) async fn complete_action(
223
9
        &mut self,
224
9
        operation_id: &OperationId,
225
9
    ) -> Result<(), Error> {
226
9
        let pending_action_info = self.running_action_infos.remove(operation_id).err_tip(|| 
{0
227
0
            format!(
228
0
                "Worker {} tried to complete operation {} that was not running",
229
                self.id, operation_id
230
            )
231
0
        })?;
232
9
        self.restore_platform_properties(&pending_action_info.action_info.platform_properties);
233
9
        self.is_paused = false;
234
9
        self.metrics.actions_completed.inc();
235
9
        Ok(())
236
9
    }
237
238
0
    pub fn has_actions(&self) -> bool {
239
0
        !self.running_action_infos.is_empty()
240
0
    }
241
242
9
    fn restore_platform_properties(&mut self, props: &PlatformProperties) {
243
11
        for (
property2
,
prop_value2
) in &props.properties {
244
2
            if let PlatformPropertyValue::Minimum(value) = prop_value {
  Branch (244:20): [True: 2, False: 0]
  Branch (244:20): [Folded - Ignored]
245
2
                let worker_props = &mut self.platform_properties.properties;
246
2
                if let PlatformPropertyValue::Minimum(worker_value) =
  Branch (246:24): [True: 2, False: 0]
  Branch (246:24): [Folded - Ignored]
247
2
                    worker_props.get_mut(property).unwrap()
248
2
                {
249
2
                    *worker_value += value;
250
2
                
}0
251
0
            }
252
        }
253
9
    }
254
255
43
    pub const fn can_accept_work(&self) -> bool {
256
43
        !self.is_paused && !self.is_draining
  Branch (256:9): [True: 43, False: 0]
  Branch (256:9): [Folded - Ignored]
257
43
    }
258
}
259
260
impl PartialEq for Worker {
261
0
    fn eq(&self, other: &Self) -> bool {
262
0
        self.id == other.id
263
0
    }
264
}
265
266
impl Eq for Worker {}
267
268
impl Hash for Worker {
269
0
    fn hash<H: Hasher>(&self, state: &mut H) {
270
0
        self.id.hash(state);
271
0
    }
272
}
273
274
#[derive(Debug, Default, MetricsComponent)]
275
struct Metrics {
276
    #[metric(help = "The timestamp of when this worker connected.")]
277
    connected_timestamp: u64,
278
    #[metric(help = "The number of actions completed for this worker.")]
279
    actions_completed: CounterWithTime,
280
    #[metric(help = "The number of actions started for this worker.")]
281
    run_action: AsyncCounterWrapper,
282
    #[metric(help = "The number of keep_alive sent to this worker.")]
283
    keep_alive: FuncCounterWrapper,
284
    #[metric(help = "The number of notify_disconnect sent to this worker.")]
285
    notify_disconnect: CounterWithTime,
286
}