Coverage Report

Created: 2025-03-08 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/api_worker_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::ops::{Deref, DerefMut};
16
use std::sync::Arc;
17
18
use async_lock::Mutex;
19
use lru::LruCache;
20
use nativelink_config::schedulers::WorkerAllocationStrategy;
21
use nativelink_error::{error_if, make_err, make_input_err, Code, Error, ResultExt};
22
use nativelink_metric::{
23
    group, MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
24
    RootMetricsComponent,
25
};
26
use nativelink_util::action_messages::{OperationId, WorkerId};
27
use nativelink_util::operation_state_manager::{UpdateOperationType, WorkerStateManager};
28
use nativelink_util::platform_properties::PlatformProperties;
29
use nativelink_util::spawn;
30
use nativelink_util::task::JoinHandleDropGuard;
31
use tokio::sync::mpsc::{self, UnboundedSender};
32
use tokio::sync::Notify;
33
use tonic::async_trait;
34
use tracing::{event, Level};
35
36
use crate::platform_property_manager::PlatformPropertyManager;
37
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp, WorkerUpdate};
38
use crate::worker_scheduler::WorkerScheduler;
39
40
struct Workers(LruCache<WorkerId, Worker>);
41
42
impl Deref for Workers {
43
    type Target = LruCache<WorkerId, Worker>;
44
45
57
    fn deref(&self) -> &Self::Target {
46
57
        &self.0
47
57
    }
48
}
49
50
impl DerefMut for Workers {
51
109
    fn deref_mut(&mut self) -> &mut Self::Target {
52
109
        &mut self.0
53
109
    }
54
}
55
56
// Note: This could not be a derive macro because this derive-macro
57
// does n ot support LruCache and nameless field structs.
58
impl MetricsComponent for Workers {
59
0
    fn publish(
60
0
        &self,
61
0
        _kind: MetricKind,
62
0
        _field_metadata: MetricFieldData,
63
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
64
0
        let _enter = group!("workers").entered();
65
0
        for (worker_id, worker) in self.iter() {
66
0
            let _enter = group!(worker_id).entered();
67
0
            worker.publish(MetricKind::Component, MetricFieldData::default())?;
68
        }
69
0
        Ok(MetricPublishKnownKindData::Component)
70
0
    }
71
}
72
73
/// A collection of workers that are available to run tasks.
74
#[derive(MetricsComponent)]
75
struct ApiWorkerSchedulerImpl {
76
    /// A `LruCache` of workers availabled based on `allocation_strategy`.
77
    #[metric(group = "workers")]
78
    workers: Workers,
79
80
    /// The worker state manager.
81
    #[metric(group = "worker_state_manager")]
82
    worker_state_manager: Arc<dyn WorkerStateManager>,
83
    /// The allocation strategy for workers.
84
    allocation_strategy: WorkerAllocationStrategy,
85
    /// A channel to notify the matching engine that the worker pool has changed.
86
    worker_change_notify: Arc<Notify>,
87
    /// A channel to notify that an operation is still alive.
88
    operation_keep_alive_tx: UnboundedSender<(OperationId, WorkerId)>,
89
}
90
91
impl ApiWorkerSchedulerImpl {
92
    /// Refreshes the lifetime of the worker with the given timestamp.
93
2
    fn refresh_lifetime(
94
2
        &mut self,
95
2
        worker_id: &WorkerId,
96
2
        timestamp: WorkerTimestamp,
97
2
    ) -> Result<(), Error> {
98
2
        let worker = self.workers.0.peek_mut(worker_id).ok_or_else(|| {
99
0
            make_input_err!(
100
0
                "Worker not found in worker map in refresh_lifetime() {}",
101
0
                worker_id
102
0
            )
103
2
        })
?0
;
104
0
        error_if!(
105
2
            worker.last_update_timestamp > timestamp,
  Branch (105:13): [True: 0, False: 2]
  Branch (105:13): [Folded - Ignored]
106
            "Worker already had a timestamp of {}, but tried to update it with {}",
107
            worker.last_update_timestamp,
108
            timestamp
109
        );
110
2
        worker.last_update_timestamp = timestamp;
111
2
        for 
operation_id0
in worker.running_action_infos.keys() {
112
0
            if self
  Branch (112:16): [True: 0, False: 0]
  Branch (112:16): [Folded - Ignored]
113
0
                .operation_keep_alive_tx
114
0
                .send((operation_id.clone(), worker_id.clone()))
115
0
                .is_err()
116
            {
117
0
                event!(
118
0
                    Level::ERROR,
119
                    ?operation_id,
120
                    ?worker_id,
121
0
                    "OperationKeepAliveTx stream closed"
122
                );
123
0
            }
124
        }
125
2
        Ok(())
126
2
    }
127
128
    /// Adds a worker to the pool.
129
    /// Note: This function will not do any task matching.
130
31
    fn add_worker(&mut self, worker: Worker) -> Result<(), Error> {
131
31
        let worker_id = worker.id.clone();
132
31
        self.workers.put(worker_id.clone(), worker);
133
31
134
31
        // Worker is not cloneable, and we do not want to send the initial connection results until
135
31
        // we have added it to the map, or we might get some strange race conditions due to the way
136
31
        // the multi-threaded runtime works.
137
31
        let worker = self.workers.peek_mut(&worker_id).unwrap();
138
31
        let res = worker
139
31
            .send_initial_connection_result()
140
31
            .err_tip(|| 
"Failed to send initial connection result to worker"0
);
141
31
        if let Err(
err0
) = &res {
  Branch (141:16): [True: 0, False: 31]
  Branch (141:16): [Folded - Ignored]
142
0
            event!(
143
0
                Level::ERROR,
144
                ?worker_id,
145
                ?err,
146
0
                "Worker connection appears to have been closed while adding to pool"
147
            );
148
31
        }
149
31
        self.worker_change_notify.notify_one();
150
31
        res
151
31
    }
152
153
    /// Removes worker from pool.
154
    /// Note: The caller is responsible for any rescheduling of any tasks that might be
155
    /// running.
156
6
    fn remove_worker(&mut self, worker_id: &WorkerId) -> Option<Worker> {
157
6
        let result = self.workers.pop(worker_id);
158
6
        self.worker_change_notify.notify_one();
159
6
        result
160
6
    }
161
162
    /// Sets if the worker is draining or not.
163
2
    async fn set_drain_worker(
164
2
        &mut self,
165
2
        worker_id: &WorkerId,
166
2
        is_draining: bool,
167
2
    ) -> Result<(), Error> {
168
2
        let worker = self
169
2
            .workers
170
2
            .get_mut(worker_id)
171
2
            .err_tip(|| 
format!("Worker {worker_id} doesn't exist in the pool")0
)
?0
;
172
2
        worker.is_draining = is_draining;
173
2
        self.worker_change_notify.notify_one();
174
2
        Ok(())
175
2
    }
176
177
45
    fn inner_find_worker_for_action(
178
45
        &self,
179
45
        platform_properties: &PlatformProperties,
180
45
    ) -> Option<WorkerId> {
181
45
        let mut workers_iter = self.workers.iter();
182
45
        let workers_iter = match self.allocation_strategy {
183
            // Use rfind to get the least recently used that satisfies the properties.
184
45
            WorkerAllocationStrategy::least_recently_used => workers_iter.rfind(|(_, w)| {
185
34
                w.can_accept_work() && 
platform_properties.is_satisfied_by(&w.platform_properties)33
  Branch (185:17): [True: 33, False: 1]
  Branch (185:17): [Folded - Ignored]
186
45
            
}34
),
187
            // Use find to get the most recently used that satisfies the properties.
188
0
            WorkerAllocationStrategy::most_recently_used => workers_iter.find(|(_, w)| {
189
0
                w.can_accept_work() && platform_properties.is_satisfied_by(&w.platform_properties)
  Branch (189:17): [True: 0, False: 0]
  Branch (189:17): [Folded - Ignored]
190
0
            }),
191
        };
192
45
        workers_iter.map(|(_, w)| 
w.id.clone()28
)
193
45
    }
194
195
10
    async fn update_action(
196
10
        &mut self,
197
10
        worker_id: &WorkerId,
198
10
        operation_id: &OperationId,
199
10
        update: UpdateOperationType,
200
10
    ) -> Result<(), Error> {
201
10
        let worker = self.workers.get_mut(worker_id).err_tip(|| {
202
0
            format!("Worker {worker_id} does not exist in SimpleScheduler::update_action")
203
10
        })
?0
;
204
205
        // Ensure the worker is supposed to be running the operation.
206
10
        if !worker.running_action_infos.contains_key(operation_id) {
  Branch (206:12): [True: 1, False: 9]
  Branch (206:12): [Folded - Ignored]
207
1
            let err = make_err!(
208
1
                Code::Internal,
209
1
                "Operation {operation_id} should not be running on worker {worker_id} in SimpleScheduler::update_action"
210
1
            );
211
1
            return Result::<(), _>::Err(err.clone())
212
1
                .merge(self.immediate_evict_worker(worker_id, err).await);
213
9
        }
214
215
9
        let (is_finished, due_to_backpressure) = match &update {
216
6
            UpdateOperationType::UpdateWithActionStage(action_stage) => {
217
6
                (action_stage.is_finished(), false)
218
            }
219
0
            UpdateOperationType::KeepAlive => (false, false),
220
3
            UpdateOperationType::UpdateWithError(err) => {
221
3
                (true, err.code == Code::ResourceExhausted)
222
            }
223
        };
224
225
        // Update the operation in the worker state manager.
226
        {
227
9
            let update_operation_res = self
228
9
                .worker_state_manager
229
9
                .update_operation(operation_id, worker_id, update)
230
9
                .await
231
9
                .err_tip(|| 
"in update_operation on SimpleScheduler::update_action"0
);
232
9
            if let Err(
err0
) = update_operation_res {
  Branch (232:20): [True: 0, False: 9]
  Branch (232:20): [Folded - Ignored]
233
0
                event!(
234
0
                    Level::ERROR,
235
                    ?operation_id,
236
                    ?worker_id,
237
                    ?err,
238
0
                    "Failed to update_operation on update_action"
239
                );
240
0
                return Err(err);
241
9
            }
242
9
        }
243
9
244
9
        if !is_finished {
  Branch (244:12): [True: 0, False: 9]
  Branch (244:12): [Folded - Ignored]
245
0
            return Ok(());
246
9
        }
247
248
        // Clear this action from the current worker if finished.
249
9
        let complete_action_res = {
250
9
            let was_paused = !worker.can_accept_work();
251
252
            // Note: We need to run this before dealing with backpressure logic.
253
9
            let complete_action_res = worker.complete_action(operation_id).await;
254
255
            // Only pause if there's an action still waiting that will unpause.
256
9
            if (was_paused || due_to_backpressure) && 
worker.has_actions()0
{
  Branch (256:17): [True: 0, False: 9]
  Branch (256:31): [True: 0, False: 9]
  Branch (256:55): [True: 0, False: 0]
  Branch (256:17): [Folded - Ignored]
  Branch (256:31): [Folded - Ignored]
  Branch (256:55): [Folded - Ignored]
257
0
                worker.is_paused = true;
258
9
            }
259
9
            complete_action_res
260
9
        };
261
9
262
9
        self.worker_change_notify.notify_one();
263
9
264
9
        complete_action_res
265
10
    }
266
267
    /// Notifies the specified worker to run the given action and handles errors by evicting
268
    /// the worker if the notification fails.
269
28
    async fn worker_notify_run_action(
270
28
        &mut self,
271
28
        worker_id: WorkerId,
272
28
        operation_id: OperationId,
273
28
        action_info: ActionInfoWithProps,
274
28
    ) -> Result<(), Error> {
275
28
        if let Some(worker) = self.workers.get_mut(&worker_id) {
  Branch (275:16): [True: 0, False: 0]
  Branch (275:16): [Folded - Ignored]
  Branch (275:16): [True: 27, False: 0]
  Branch (275:16): [True: 1, False: 0]
276
28
            let notify_worker_result = worker
277
28
                .notify_update(WorkerUpdate::RunAction((operation_id, action_info.clone())))
278
28
                .await;
279
280
28
            if notify_worker_result.is_err() {
  Branch (280:16): [True: 0, False: 0]
  Branch (280:16): [Folded - Ignored]
  Branch (280:16): [True: 1, False: 26]
  Branch (280:16): [True: 0, False: 1]
281
1
                event!(
282
1
                    Level::WARN,
283
                    ?worker_id,
284
                    ?action_info,
285
                    ?notify_worker_result,
286
1
                    "Worker command failed, removing worker",
287
                );
288
289
1
                let err = make_err!(
290
1
                    Code::Internal,
291
1
                    "Worker command failed, removing worker {worker_id} -- {notify_worker_result:?}",
292
1
                );
293
1
294
1
                return Result::<(), _>::Err(err.clone())
295
1
                    .merge(self.immediate_evict_worker(&worker_id, err).await);
296
27
            }
297
        } else {
298
0
            event!(
299
0
                Level::WARN,
300
                ?worker_id,
301
                ?operation_id,
302
                ?action_info,
303
0
                "Worker not found in worker map in worker_notify_run_action"
304
            );
305
        }
306
27
        Ok(())
307
28
    }
308
309
    /// Evicts the worker from the pool and puts items back into the queue if anything was being executed on it.
310
6
    async fn immediate_evict_worker(
311
6
        &mut self,
312
6
        worker_id: &WorkerId,
313
6
        err: Error,
314
6
    ) -> Result<(), Error> {
315
6
        let mut result = Ok(());
316
6
        if let Some(mut worker) = self.remove_worker(worker_id) {
  Branch (316:16): [True: 5, False: 0]
  Branch (316:16): [Folded - Ignored]
  Branch (316:16): [True: 1, False: 0]
  Branch (316:16): [True: 0, False: 0]
317
            // We don't care if we fail to send message to worker, this is only a best attempt.
318
6
            let _ = worker.notify_update(WorkerUpdate::Disconnect).await;
319
6
            for (
operation_id4
, _) in worker.running_action_infos.drain() {
320
4
                result = result.merge(
321
4
                    self.worker_state_manager
322
4
                        .update_operation(
323
4
                            &operation_id,
324
4
                            worker_id,
325
4
                            UpdateOperationType::UpdateWithError(err.clone()),
326
4
                        )
327
4
                        .await,
328
                );
329
            }
330
0
        }
331
        // Note: Calling this many time is very cheap, it'll only trigger `do_try_match` once.
332
        // TODO(allada) This should be moved to inside the Workers struct.
333
6
        self.worker_change_notify.notify_one();
334
6
        result
335
6
    }
336
}
337
338
#[derive(MetricsComponent)]
339
pub struct ApiWorkerScheduler {
340
    #[metric]
341
    inner: Mutex<ApiWorkerSchedulerImpl>,
342
    #[metric(group = "platform_property_manager")]
343
    platform_property_manager: Arc<PlatformPropertyManager>,
344
345
    #[metric(
346
        help = "Timeout of how long to evict workers if no response in this given amount of time in seconds."
347
    )]
348
    worker_timeout_s: u64,
349
    _operation_keep_alive_spawn: JoinHandleDropGuard<()>,
350
}
351
352
impl ApiWorkerScheduler {
353
28
    pub fn new(
354
28
        worker_state_manager: Arc<dyn WorkerStateManager>,
355
28
        platform_property_manager: Arc<PlatformPropertyManager>,
356
28
        allocation_strategy: WorkerAllocationStrategy,
357
28
        worker_change_notify: Arc<Notify>,
358
28
        worker_timeout_s: u64,
359
28
    ) -> Arc<Self> {
360
28
        let (operation_keep_alive_tx, mut operation_keep_alive_rx) = mpsc::unbounded_channel();
361
28
        Arc::new(Self {
362
28
            inner: Mutex::new(ApiWorkerSchedulerImpl {
363
28
                workers: Workers(LruCache::unbounded()),
364
28
                worker_state_manager: worker_state_manager.clone(),
365
28
                allocation_strategy,
366
28
                worker_change_notify,
367
28
                operation_keep_alive_tx,
368
28
            }),
369
28
            platform_property_manager,
370
28
            worker_timeout_s,
371
28
            _operation_keep_alive_spawn: spawn!(
372
28
                "simple_scheduler_operation_keep_alive",
373
28
                async move 
{23
374
                    const RECV_MANY_LIMIT: usize = 256;
375
23
                    let mut messages = Vec::with_capacity(RECV_MANY_LIMIT);
376
                    loop {
377
23
                        messages.clear();
378
23
                        operation_keep_alive_rx
379
23
                            .recv_many(&mut messages, RECV_MANY_LIMIT)
380
23
                            .await;
381
0
                        if messages.is_empty() {
  Branch (381:28): [True: 0, False: 0]
  Branch (381:28): [Folded - Ignored]
382
0
                            return; // Looks like our sender has been dropped.
383
0
                        }
384
0
                        for (operation_id, worker_id) in messages.drain(..) {
385
0
                            let update_operation_res = worker_state_manager
386
0
                                .update_operation(
387
0
                                    &operation_id,
388
0
                                    &worker_id,
389
0
                                    UpdateOperationType::KeepAlive,
390
0
                                )
391
0
                                .await;
392
0
                            if let Err(err) = update_operation_res {
  Branch (392:36): [True: 0, False: 0]
  Branch (392:36): [Folded - Ignored]
393
0
                                event!(Level::WARN, ?err, "Error while running worker_keep_alive_received, maybe job is done?");
394
0
                            }
395
                        }
396
                    }
397
28
                
}0
398
28
            ),
399
        })
400
28
    }
401
402
28
    pub async fn worker_notify_run_action(
403
28
        &self,
404
28
        worker_id: WorkerId,
405
28
        operation_id: OperationId,
406
28
        action_info: ActionInfoWithProps,
407
28
    ) -> Result<(), Error> {
408
28
        let mut inner = self.inner.lock().await;
409
28
        inner
410
28
            .worker_notify_run_action(worker_id, operation_id, action_info)
411
28
            .await
412
28
    }
413
414
    /// Attempts to find a worker that is capable of running this action.
415
    // TODO(blaise.bruer) This algorithm is not very efficient. Simple testing using a tree-like
416
    // structure showed worse performance on a 10_000 worker * 7 properties * 1000 queued tasks
417
    // simulation of worst cases in a single threaded environment.
418
45
    pub async fn find_worker_for_action(
419
45
        &self,
420
45
        platform_properties: &PlatformProperties,
421
45
    ) -> Option<WorkerId> {
422
45
        let inner = self.inner.lock().await;
423
45
        inner.inner_find_worker_for_action(platform_properties)
424
45
    }
425
426
    /// Checks to see if the worker exists in the worker pool. Should only be used in unit tests.
427
    #[must_use]
428
7
    pub async fn contains_worker_for_test(&self, worker_id: &WorkerId) -> bool {
429
7
        let inner = self.inner.lock().await;
430
7
        inner.workers.contains(worker_id)
431
7
    }
432
433
    /// A unit test function used to send the keep alive message to the worker from the server.
434
0
    pub async fn send_keep_alive_to_worker_for_test(
435
0
        &self,
436
0
        worker_id: &WorkerId,
437
1
    ) -> Result<(), Error> {
438
1
        let mut inner = self.inner.lock().await;
439
1
        let worker = inner.workers.get_mut(worker_id).ok_or_else(|| {
440
0
            make_input_err!("WorkerId '{}' does not exist in workers map", worker_id)
441
1
        })
?0
;
442
1
        worker.keep_alive()
443
1
    }
444
}
445
446
#[async_trait]
447
impl WorkerScheduler for ApiWorkerScheduler {
448
1
    fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
449
1
        self.platform_property_manager.as_ref()
450
1
    }
451
452
31
    async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
453
31
        let mut inner = self.inner.lock().await;
454
31
        let worker_id = worker.id.clone();
455
31
        let result = inner
456
31
            .add_worker(worker)
457
31
            .err_tip(|| 
"Error while adding worker, removing from pool"0
);
458
31
        if let Err(
err0
) = result {
  Branch (458:16): [True: 0, False: 31]
  Branch (458:16): [Folded - Ignored]
459
0
            return Result::<(), _>::Err(err.clone())
460
0
                .merge(inner.immediate_evict_worker(&worker_id, err).await);
461
31
        }
462
31
        Ok(())
463
62
    }
464
465
    async fn update_action(
466
        &self,
467
        worker_id: &WorkerId,
468
        operation_id: &OperationId,
469
        update: UpdateOperationType,
470
10
    ) -> Result<(), Error> {
471
10
        let mut inner = self.inner.lock().await;
472
10
        inner.update_action(worker_id, operation_id, update).await
473
20
    }
474
475
    async fn worker_keep_alive_received(
476
        &self,
477
        worker_id: &WorkerId,
478
        timestamp: WorkerTimestamp,
479
2
    ) -> Result<(), Error> {
480
2
        let mut inner = self.inner.lock().await;
481
2
        inner
482
2
            .refresh_lifetime(worker_id, timestamp)
483
2
            .err_tip(|| 
"Error refreshing lifetime in worker_keep_alive_received()"0
)
484
4
    }
485
486
2
    async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
487
2
        let mut inner = self.inner.lock().await;
488
2
        inner
489
2
            .immediate_evict_worker(
490
2
                worker_id,
491
2
                make_err!(Code::Internal, "Received request to remove worker"),
492
2
            )
493
2
            .await
494
4
    }
495
496
5
    async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
497
5
        let mut inner = self.inner.lock().await;
498
499
5
        let mut result = Ok(());
500
5
        // Items should be sorted based on last_update_timestamp, so we don't need to iterate the entire
501
5
        // map most of the time.
502
5
        let worker_ids_to_remove: Vec<WorkerId> = inner
503
5
            .workers
504
5
            .iter()
505
5
            .rev()
506
6
            .map_while(|(worker_id, worker)| {
507
6
                if worker.last_update_timestamp <= now_timestamp - self.worker_timeout_s {
  Branch (507:20): [True: 2, False: 4]
  Branch (507:20): [Folded - Ignored]
508
2
                    Some(worker_id.clone())
509
                } else {
510
4
                    None
511
                }
512
6
            })
513
5
            .collect();
514
7
        for 
worker_id2
in &worker_ids_to_remove {
515
2
            event!(
516
2
                Level::WARN,
517
                ?worker_id,
518
2
                "Worker timed out, removing from pool"
519
            );
520
2
            result = result.merge(
521
2
                inner
522
2
                    .immediate_evict_worker(
523
2
                        worker_id,
524
2
                        make_err!(
525
2
                            Code::Internal,
526
2
                            "Worker {worker_id} timed out, removing from pool"
527
2
                        ),
528
2
                    )
529
2
                    .await,
530
            );
531
        }
532
533
5
        result
534
10
    }
535
536
2
    async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
537
2
        let mut inner = self.inner.lock().await;
538
2
        inner.set_drain_worker(worker_id, is_draining).await
539
4
    }
540
}
541
542
impl RootMetricsComponent for ApiWorkerScheduler {}