Coverage Report

Created: 2025-05-30 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/api_worker_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::ops::{Deref, DerefMut};
16
use std::sync::Arc;
17
18
use async_lock::Mutex;
19
use lru::LruCache;
20
use nativelink_config::schedulers::WorkerAllocationStrategy;
21
use nativelink_error::{Code, Error, ResultExt, error_if, make_err, make_input_err};
22
use nativelink_metric::{
23
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
24
    RootMetricsComponent, group,
25
};
26
use nativelink_util::action_messages::{OperationId, WorkerId};
27
use nativelink_util::operation_state_manager::{UpdateOperationType, WorkerStateManager};
28
use nativelink_util::platform_properties::PlatformProperties;
29
use nativelink_util::spawn;
30
use nativelink_util::task::JoinHandleDropGuard;
31
use tokio::sync::Notify;
32
use tokio::sync::mpsc::{self, UnboundedSender};
33
use tonic::async_trait;
34
use tracing::{error, warn};
35
36
use crate::platform_property_manager::PlatformPropertyManager;
37
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp, WorkerUpdate};
38
use crate::worker_scheduler::WorkerScheduler;
39
40
#[derive(Debug)]
41
struct Workers(LruCache<WorkerId, Worker>);
42
43
impl Deref for Workers {
44
    type Target = LruCache<WorkerId, Worker>;
45
46
57
    fn deref(&self) -> &Self::Target {
47
57
        &self.0
48
57
    }
49
}
50
51
impl DerefMut for Workers {
52
109
    fn deref_mut(&mut self) -> &mut Self::Target {
53
109
        &mut self.0
54
109
    }
55
}
56
57
// Note: This could not be a derive macro because this derive-macro
58
// does not support LruCache and nameless field structs.
59
impl MetricsComponent for Workers {
60
0
    fn publish(
61
0
        &self,
62
0
        _kind: MetricKind,
63
0
        _field_metadata: MetricFieldData,
64
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
65
0
        let _enter = group!("workers").entered();
66
0
        for (worker_id, worker) in self.iter() {
67
0
            let _enter = group!(worker_id).entered();
68
0
            worker.publish(MetricKind::Component, MetricFieldData::default())?;
69
        }
70
0
        Ok(MetricPublishKnownKindData::Component)
71
0
    }
72
}
73
74
/// A collection of workers that are available to run tasks.
75
#[derive(MetricsComponent)]
76
struct ApiWorkerSchedulerImpl {
77
    /// A `LruCache` of workers available based on `allocation_strategy`.
78
    #[metric(group = "workers")]
79
    workers: Workers,
80
81
    /// The worker state manager.
82
    #[metric(group = "worker_state_manager")]
83
    worker_state_manager: Arc<dyn WorkerStateManager>,
84
    /// The allocation strategy for workers.
85
    allocation_strategy: WorkerAllocationStrategy,
86
    /// A channel to notify the matching engine that the worker pool has changed.
87
    worker_change_notify: Arc<Notify>,
88
    /// A channel to notify that an operation is still alive.
89
    operation_keep_alive_tx: UnboundedSender<(OperationId, WorkerId)>,
90
}
91
92
impl core::fmt::Debug for ApiWorkerSchedulerImpl {
93
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
94
0
        f.debug_struct("ApiWorkerSchedulerImpl")
95
0
            .field("workers", &self.workers)
96
0
            .field("allocation_strategy", &self.allocation_strategy)
97
0
            .field("worker_change_notify", &self.worker_change_notify)
98
0
            .field("operation_keep_alive_tx", &self.operation_keep_alive_tx)
99
0
            .finish_non_exhaustive()
100
0
    }
101
}
102
103
impl ApiWorkerSchedulerImpl {
104
    /// Refreshes the lifetime of the worker with the given timestamp.
105
2
    fn refresh_lifetime(
106
2
        &mut self,
107
2
        worker_id: &WorkerId,
108
2
        timestamp: WorkerTimestamp,
109
2
    ) -> Result<(), Error> {
110
2
        let worker = self.workers.0.peek_mut(worker_id).ok_or_else(|| 
{0
111
0
            make_input_err!(
112
                "Worker not found in worker map in refresh_lifetime() {}",
113
                worker_id
114
            )
115
0
        })?;
116
0
        error_if!(
117
2
            worker.last_update_timestamp > timestamp,
  Branch (117:13): [True: 0, False: 2]
  Branch (117:13): [Folded - Ignored]
118
            "Worker already had a timestamp of {}, but tried to update it with {}",
119
            worker.last_update_timestamp,
120
            timestamp
121
        );
122
2
        worker.last_update_timestamp = timestamp;
123
2
        for 
operation_id0
in worker.running_action_infos.keys() {
124
0
            if self
  Branch (124:16): [True: 0, False: 0]
  Branch (124:16): [Folded - Ignored]
125
0
                .operation_keep_alive_tx
126
0
                .send((operation_id.clone(), worker_id.clone()))
127
0
                .is_err()
128
            {
129
0
                error!(
130
                    ?operation_id,
131
                    ?worker_id,
132
0
                    "OperationKeepAliveTx stream closed"
133
                );
134
0
            }
135
        }
136
2
        Ok(())
137
2
    }
138
139
    /// Adds a worker to the pool.
140
    /// Note: This function will not do any task matching.
141
31
    fn add_worker(&mut self, worker: Worker) -> Result<(), Error> {
142
31
        let worker_id = worker.id.clone();
143
31
        self.workers.put(worker_id.clone(), worker);
144
145
        // Worker is not cloneable, and we do not want to send the initial connection results until
146
        // we have added it to the map, or we might get some strange race conditions due to the way
147
        // the multi-threaded runtime works.
148
31
        let worker = self.workers.peek_mut(&worker_id).unwrap();
149
31
        let res = worker
150
31
            .send_initial_connection_result()
151
31
            .err_tip(|| "Failed to send initial connection result to worker");
152
31
        if let Err(
err0
) = &res {
  Branch (152:16): [True: 0, False: 31]
  Branch (152:16): [Folded - Ignored]
153
0
            error!(
154
                ?worker_id,
155
                ?err,
156
0
                "Worker connection appears to have been closed while adding to pool"
157
            );
158
31
        }
159
31
        self.worker_change_notify.notify_one();
160
31
        res
161
31
    }
162
163
    /// Removes worker from pool.
164
    /// Note: The caller is responsible for any rescheduling of any tasks that might be
165
    /// running.
166
6
    fn remove_worker(&mut self, worker_id: &WorkerId) -> Option<Worker> {
167
6
        let result = self.workers.pop(worker_id);
168
6
        self.worker_change_notify.notify_one();
169
6
        result
170
6
    }
171
172
    /// Sets if the worker is draining or not.
173
2
    async fn set_drain_worker(
174
2
        &mut self,
175
2
        worker_id: &WorkerId,
176
2
        is_draining: bool,
177
2
    ) -> Result<(), Error> {
178
2
        let worker = self
179
2
            .workers
180
2
            .get_mut(worker_id)
181
2
            .err_tip(|| format!(
"Worker {worker_id} doesn't exist in the pool"0
))
?0
;
182
2
        worker.is_draining = is_draining;
183
2
        self.worker_change_notify.notify_one();
184
2
        Ok(())
185
2
    }
186
187
45
    fn inner_find_worker_for_action(
188
45
        &self,
189
45
        platform_properties: &PlatformProperties,
190
45
    ) -> Option<WorkerId> {
191
45
        let mut workers_iter = self.workers.iter();
192
45
        let workers_iter = match self.allocation_strategy {
193
            // Use rfind to get the least recently used that satisfies the properties.
194
45
            WorkerAllocationStrategy::LeastRecentlyUsed => workers_iter.rfind(|(_, w)| 
{34
195
34
                w.can_accept_work() && 
platform_properties33
.
is_satisfied_by33
(
&w.platform_properties33
)
  Branch (195:17): [True: 33, False: 1]
  Branch (195:17): [Folded - Ignored]
196
34
            }),
197
            // Use find to get the most recently used that satisfies the properties.
198
0
            WorkerAllocationStrategy::MostRecentlyUsed => workers_iter.find(|(_, w)| {
199
0
                w.can_accept_work() && platform_properties.is_satisfied_by(&w.platform_properties)
  Branch (199:17): [True: 0, False: 0]
  Branch (199:17): [Folded - Ignored]
200
0
            }),
201
        };
202
45
        workers_iter.map(|(_, w)| 
w.id28
.
clone28
())
203
45
    }
204
205
10
    async fn update_action(
206
10
        &mut self,
207
10
        worker_id: &WorkerId,
208
10
        operation_id: &OperationId,
209
10
        update: UpdateOperationType,
210
10
    ) -> Result<(), Error> {
211
10
        let worker = self.workers.get_mut(worker_id).err_tip(|| 
{0
212
0
            format!("Worker {worker_id} does not exist in SimpleScheduler::update_action")
213
0
        })?;
214
215
        // Ensure the worker is supposed to be running the operation.
216
10
        if !worker.running_action_infos.contains_key(operation_id) {
  Branch (216:12): [True: 1, False: 9]
  Branch (216:12): [Folded - Ignored]
217
1
            let err = make_err!(
218
1
                Code::Internal,
219
                "Operation {operation_id} should not be running on worker {worker_id} in SimpleScheduler::update_action"
220
            );
221
1
            return Result::<(), _>::Err(err.clone())
222
1
                .merge(self.immediate_evict_worker(worker_id, err).await);
223
9
        }
224
225
9
        let (is_finished, due_to_backpressure) = match &update {
226
6
            UpdateOperationType::UpdateWithActionStage(action_stage) => {
227
6
                (action_stage.is_finished(), false)
228
            }
229
0
            UpdateOperationType::KeepAlive => (false, false),
230
3
            UpdateOperationType::UpdateWithError(err) => {
231
3
                (true, err.code == Code::ResourceExhausted)
232
            }
233
        };
234
235
        // Update the operation in the worker state manager.
236
        {
237
9
            let update_operation_res = self
238
9
                .worker_state_manager
239
9
                .update_operation(operation_id, worker_id, update)
240
9
                .await
241
9
                .err_tip(|| "in update_operation on SimpleScheduler::update_action");
242
9
            if let Err(
err0
) = update_operation_res {
  Branch (242:20): [True: 0, False: 9]
  Branch (242:20): [Folded - Ignored]
243
0
                error!(
244
                    ?operation_id,
245
                    ?worker_id,
246
                    ?err,
247
0
                    "Failed to update_operation on update_action"
248
                );
249
0
                return Err(err);
250
9
            }
251
        }
252
253
9
        if !is_finished {
  Branch (253:12): [True: 0, False: 9]
  Branch (253:12): [Folded - Ignored]
254
0
            return Ok(());
255
9
        }
256
257
        // Clear this action from the current worker if finished.
258
9
        let complete_action_res = {
259
9
            let was_paused = !worker.can_accept_work();
260
261
            // Note: We need to run this before dealing with backpressure logic.
262
9
            let complete_action_res = worker.complete_action(operation_id).await;
263
264
            // Only pause if there's an action still waiting that will unpause.
265
9
            if (was_paused || due_to_backpressure) && 
worker0
.
has_actions0
() {
  Branch (265:17): [True: 0, False: 9]
  Branch (265:31): [True: 0, False: 9]
  Branch (265:55): [True: 0, False: 0]
  Branch (265:17): [Folded - Ignored]
  Branch (265:31): [Folded - Ignored]
  Branch (265:55): [Folded - Ignored]
266
0
                worker.is_paused = true;
267
9
            }
268
9
            complete_action_res
269
        };
270
271
9
        self.worker_change_notify.notify_one();
272
273
9
        complete_action_res
274
10
    }
275
276
    /// Notifies the specified worker to run the given action and handles errors by evicting
277
    /// the worker if the notification fails.
278
28
    async fn worker_notify_run_action(
279
28
        &mut self,
280
28
        worker_id: WorkerId,
281
28
        operation_id: OperationId,
282
28
        action_info: ActionInfoWithProps,
283
28
    ) -> Result<(), Error> {
284
28
        if let Some(worker) = self.workers.get_mut(&worker_id) {
  Branch (284:16): [True: 0, False: 0]
  Branch (284:16): [Folded - Ignored]
  Branch (284:16): [True: 27, False: 0]
  Branch (284:16): [True: 1, False: 0]
285
28
            let notify_worker_result = worker
286
28
                .notify_update(WorkerUpdate::RunAction((operation_id, action_info.clone())))
287
28
                .await;
288
289
28
            if notify_worker_result.is_err() {
  Branch (289:16): [True: 0, False: 0]
  Branch (289:16): [Folded - Ignored]
  Branch (289:16): [True: 1, False: 26]
  Branch (289:16): [True: 0, False: 1]
290
1
                warn!(
291
                    ?worker_id,
292
                    ?action_info,
293
                    ?notify_worker_result,
294
1
                    "Worker command failed, removing worker",
295
                );
296
297
1
                let err = make_err!(
298
1
                    Code::Internal,
299
                    "Worker command failed, removing worker {worker_id} -- {notify_worker_result:?}",
300
                );
301
302
1
                return Result::<(), _>::Err(err.clone())
303
1
                    .merge(self.immediate_evict_worker(&worker_id, err).await);
304
27
            }
305
        } else {
306
0
            warn!(
307
                ?worker_id,
308
                ?operation_id,
309
                ?action_info,
310
0
                "Worker not found in worker map in worker_notify_run_action"
311
            );
312
        }
313
27
        Ok(())
314
28
    }
315
316
    /// Evicts the worker from the pool and puts items back into the queue if anything was being executed on it.
317
6
    async fn immediate_evict_worker(
318
6
        &mut self,
319
6
        worker_id: &WorkerId,
320
6
        err: Error,
321
6
    ) -> Result<(), Error> {
322
6
        let mut result = Ok(());
323
6
        if let Some(mut worker) = self.remove_worker(worker_id) {
  Branch (323:16): [True: 5, False: 0]
  Branch (323:16): [Folded - Ignored]
  Branch (323:16): [True: 1, False: 0]
  Branch (323:16): [True: 0, False: 0]
324
            // We don't care if we fail to send message to worker, this is only a best attempt.
325
6
            drop(worker.notify_update(WorkerUpdate::Disconnect).await);
326
6
            for (
operation_id4
, _) in worker.running_action_infos.drain() {
327
4
                result = result.merge(
328
4
                    self.worker_state_manager
329
4
                        .update_operation(
330
4
                            &operation_id,
331
4
                            worker_id,
332
4
                            UpdateOperationType::UpdateWithError(err.clone()),
333
4
                        )
334
4
                        .await,
335
                );
336
            }
337
0
        }
338
        // Note: Calling this many time is very cheap, it'll only trigger `do_try_match` once.
339
        // TODO(aaronmondal) This should be moved to inside the Workers struct.
340
6
        self.worker_change_notify.notify_one();
341
6
        result
342
6
    }
343
}
344
345
#[derive(Debug, MetricsComponent)]
346
pub struct ApiWorkerScheduler {
347
    #[metric]
348
    inner: Mutex<ApiWorkerSchedulerImpl>,
349
    #[metric(group = "platform_property_manager")]
350
    platform_property_manager: Arc<PlatformPropertyManager>,
351
352
    #[metric(
353
        help = "Timeout of how long to evict workers if no response in this given amount of time in seconds."
354
    )]
355
    worker_timeout_s: u64,
356
    _operation_keep_alive_spawn: JoinHandleDropGuard<()>,
357
}
358
359
impl ApiWorkerScheduler {
360
28
    pub fn new(
361
28
        worker_state_manager: Arc<dyn WorkerStateManager>,
362
28
        platform_property_manager: Arc<PlatformPropertyManager>,
363
28
        allocation_strategy: WorkerAllocationStrategy,
364
28
        worker_change_notify: Arc<Notify>,
365
28
        worker_timeout_s: u64,
366
28
    ) -> Arc<Self> {
367
28
        let (operation_keep_alive_tx, mut operation_keep_alive_rx) = mpsc::unbounded_channel();
368
28
        Arc::new(Self {
369
28
            inner: Mutex::new(ApiWorkerSchedulerImpl {
370
28
                workers: Workers(LruCache::unbounded()),
371
28
                worker_state_manager: worker_state_manager.clone(),
372
28
                allocation_strategy,
373
28
                worker_change_notify,
374
28
                operation_keep_alive_tx,
375
28
            }),
376
28
            platform_property_manager,
377
28
            worker_timeout_s,
378
28
            _operation_keep_alive_spawn: spawn!(
379
                "simple_scheduler_operation_keep_alive",
380
23
                async move {
381
                    const RECV_MANY_LIMIT: usize = 256;
382
23
                    let mut messages = Vec::with_capacity(RECV_MANY_LIMIT);
383
                    loop {
384
23
                        messages.clear();
385
23
                        operation_keep_alive_rx
386
23
                            .recv_many(&mut messages, RECV_MANY_LIMIT)
387
23
                            .await;
388
0
                        if messages.is_empty() {
  Branch (388:28): [True: 0, False: 0]
  Branch (388:28): [Folded - Ignored]
389
0
                            return; // Looks like our sender has been dropped.
390
0
                        }
391
0
                        for (operation_id, worker_id) in messages.drain(..) {
392
0
                            let update_operation_res = worker_state_manager
393
0
                                .update_operation(
394
0
                                    &operation_id,
395
0
                                    &worker_id,
396
0
                                    UpdateOperationType::KeepAlive,
397
0
                                )
398
0
                                .await;
399
0
                            if let Err(err) = update_operation_res {
  Branch (399:36): [True: 0, False: 0]
  Branch (399:36): [Folded - Ignored]
400
0
                                warn!(
401
                                    ?err,
402
0
                                    "Error while running worker_keep_alive_received, maybe job is done?"
403
                                );
404
0
                            }
405
                        }
406
                    }
407
0
                }
408
            ),
409
        })
410
28
    }
411
412
28
    pub async fn worker_notify_run_action(
413
28
        &self,
414
28
        worker_id: WorkerId,
415
28
        operation_id: OperationId,
416
28
        action_info: ActionInfoWithProps,
417
28
    ) -> Result<(), Error> {
418
28
        let mut inner = self.inner.lock().await;
419
28
        inner
420
28
            .worker_notify_run_action(worker_id, operation_id, action_info)
421
28
            .await
422
28
    }
423
424
    /// Attempts to find a worker that is capable of running this action.
425
    // TODO(aaronmondal) This algorithm is not very efficient. Simple testing using a tree-like
426
    // structure showed worse performance on a 10_000 worker * 7 properties * 1000 queued tasks
427
    // simulation of worst cases in a single threaded environment.
428
45
    pub async fn find_worker_for_action(
429
45
        &self,
430
45
        platform_properties: &PlatformProperties,
431
45
    ) -> Option<WorkerId> {
432
45
        let inner = self.inner.lock().await;
433
45
        inner.inner_find_worker_for_action(platform_properties)
434
45
    }
435
436
    /// Checks to see if the worker exists in the worker pool. Should only be used in unit tests.
437
    #[must_use]
438
7
    pub async fn contains_worker_for_test(&self, worker_id: &WorkerId) -> bool {
439
7
        let inner = self.inner.lock().await;
440
7
        inner.workers.contains(worker_id)
441
7
    }
442
443
    /// A unit test function used to send the keep alive message to the worker from the server.
444
0
    pub async fn send_keep_alive_to_worker_for_test(
445
0
        &self,
446
0
        worker_id: &WorkerId,
447
1
    ) -> Result<(), Error> {
448
1
        let mut inner = self.inner.lock().await;
449
1
        let worker = inner.workers.get_mut(worker_id).ok_or_else(|| 
{0
450
0
            make_input_err!("WorkerId '{}' does not exist in workers map", worker_id)
451
0
        })?;
452
1
        worker.keep_alive()
453
1
    }
454
}
455
456
#[async_trait]
457
impl WorkerScheduler for ApiWorkerScheduler {
458
1
    fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
459
1
        self.platform_property_manager.as_ref()
460
1
    }
461
462
62
    async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
463
31
        let mut inner = self.inner.lock().await;
464
31
        let worker_id = worker.id.clone();
465
31
        let result = inner
466
31
            .add_worker(worker)
467
31
            .err_tip(|| "Error while adding worker, removing from pool");
468
31
        if let Err(
err0
) = result {
  Branch (468:16): [True: 0, False: 31]
  Branch (468:16): [Folded - Ignored]
469
0
            return Result::<(), _>::Err(err.clone())
470
0
                .merge(inner.immediate_evict_worker(&worker_id, err).await);
471
31
        }
472
31
        Ok(())
473
62
    }
474
475
    async fn update_action(
476
        &self,
477
        worker_id: &WorkerId,
478
        operation_id: &OperationId,
479
        update: UpdateOperationType,
480
20
    ) -> Result<(), Error> {
481
10
        let mut inner = self.inner.lock().await;
482
10
        inner.update_action(worker_id, operation_id, update).await
483
20
    }
484
485
    async fn worker_keep_alive_received(
486
        &self,
487
        worker_id: &WorkerId,
488
        timestamp: WorkerTimestamp,
489
4
    ) -> Result<(), Error> {
490
2
        let mut inner = self.inner.lock().await;
491
2
        inner
492
2
            .refresh_lifetime(worker_id, timestamp)
493
2
            .err_tip(|| "Error refreshing lifetime in worker_keep_alive_received()")
494
4
    }
495
496
4
    async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
497
2
        let mut inner = self.inner.lock().await;
498
2
        inner
499
2
            .immediate_evict_worker(
500
2
                worker_id,
501
2
                make_err!(Code::Internal, "Received request to remove worker"),
502
2
            )
503
2
            .await
504
4
    }
505
506
10
    async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
507
5
        let mut inner = self.inner.lock().await;
508
509
5
        let mut result = Ok(());
510
        // Items should be sorted based on last_update_timestamp, so we don't need to iterate the entire
511
        // map most of the time.
512
5
        let worker_ids_to_remove: Vec<WorkerId> = inner
513
5
            .workers
514
5
            .iter()
515
5
            .rev()
516
6
            .
map_while5
(|(worker_id, worker)| {
517
6
                if worker.last_update_timestamp <= now_timestamp - self.worker_timeout_s {
  Branch (517:20): [True: 2, False: 4]
  Branch (517:20): [Folded - Ignored]
518
2
                    Some(worker_id.clone())
519
                } else {
520
4
                    None
521
                }
522
6
            })
523
5
            .collect();
524
7
        for 
worker_id2
in &worker_ids_to_remove {
525
2
            warn!(?worker_id, "Worker timed out, removing from pool");
526
2
            result = result.merge(
527
2
                inner
528
2
                    .immediate_evict_worker(
529
2
                        worker_id,
530
2
                        make_err!(
531
2
                            Code::Internal,
532
2
                            "Worker {worker_id} timed out, removing from pool"
533
2
                        ),
534
2
                    )
535
2
                    .await,
536
            );
537
        }
538
539
5
        result
540
10
    }
541
542
4
    async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
543
2
        let mut inner = self.inner.lock().await;
544
2
        inner.set_drain_worker(worker_id, is_draining).await
545
4
    }
546
}
547
548
impl RootMetricsComponent for ApiWorkerScheduler {}