Coverage Report

Created: 2025-11-04 17:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/api_worker_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::ops::{Deref, DerefMut};
16
use std::sync::Arc;
17
18
use async_lock::Mutex;
19
use lru::LruCache;
20
use nativelink_config::schedulers::WorkerAllocationStrategy;
21
use nativelink_error::{Code, Error, ResultExt, error_if, make_err, make_input_err};
22
use nativelink_metric::{
23
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
24
    RootMetricsComponent, group,
25
};
26
use nativelink_util::action_messages::{OperationId, WorkerId};
27
use nativelink_util::operation_state_manager::{UpdateOperationType, WorkerStateManager};
28
use nativelink_util::platform_properties::PlatformProperties;
29
use nativelink_util::shutdown_guard::ShutdownGuard;
30
use nativelink_util::spawn;
31
use nativelink_util::task::JoinHandleDropGuard;
32
use tokio::sync::Notify;
33
use tokio::sync::mpsc::{self, UnboundedSender};
34
use tonic::async_trait;
35
#[cfg(feature = "worker_find_logging")]
36
use tracing::info;
37
use tracing::{error, warn};
38
39
use crate::platform_property_manager::PlatformPropertyManager;
40
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp, WorkerUpdate};
41
use crate::worker_scheduler::WorkerScheduler;
42
43
#[derive(Debug)]
44
struct Workers(LruCache<WorkerId, Worker>);
45
46
impl Deref for Workers {
47
    type Target = LruCache<WorkerId, Worker>;
48
49
62
    fn deref(&self) -> &Self::Target {
50
62
        &self.0
51
62
    }
52
}
53
54
impl DerefMut for Workers {
55
125
    fn deref_mut(&mut self) -> &mut Self::Target {
56
125
        &mut self.0
57
125
    }
58
}
59
60
// Note: This could not be a derive macro because this derive-macro
61
// does not support LruCache and nameless field structs.
62
impl MetricsComponent for Workers {
63
0
    fn publish(
64
0
        &self,
65
0
        _kind: MetricKind,
66
0
        _field_metadata: MetricFieldData,
67
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
68
0
        let _enter = group!("workers").entered();
69
0
        for (worker_id, worker) in self.iter() {
70
0
            let _enter = group!(worker_id).entered();
71
0
            worker.publish(MetricKind::Component, MetricFieldData::default())?;
72
        }
73
0
        Ok(MetricPublishKnownKindData::Component)
74
0
    }
75
}
76
77
/// A collection of workers that are available to run tasks.
78
#[derive(MetricsComponent)]
79
struct ApiWorkerSchedulerImpl {
80
    /// A `LruCache` of workers available based on `allocation_strategy`.
81
    #[metric(group = "workers")]
82
    workers: Workers,
83
84
    /// The worker state manager.
85
    #[metric(group = "worker_state_manager")]
86
    worker_state_manager: Arc<dyn WorkerStateManager>,
87
    /// The allocation strategy for workers.
88
    allocation_strategy: WorkerAllocationStrategy,
89
    /// A channel to notify the matching engine that the worker pool has changed.
90
    worker_change_notify: Arc<Notify>,
91
    /// A channel to notify that an operation is still alive.
92
    operation_keep_alive_tx: UnboundedSender<(OperationId, WorkerId)>,
93
94
    /// Whether the worker scheduler is shutting down.
95
    shutting_down: bool,
96
}
97
98
impl core::fmt::Debug for ApiWorkerSchedulerImpl {
99
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
100
0
        f.debug_struct("ApiWorkerSchedulerImpl")
101
0
            .field("workers", &self.workers)
102
0
            .field("allocation_strategy", &self.allocation_strategy)
103
0
            .field("worker_change_notify", &self.worker_change_notify)
104
0
            .field("operation_keep_alive_tx", &self.operation_keep_alive_tx)
105
0
            .finish_non_exhaustive()
106
0
    }
107
}
108
109
impl ApiWorkerSchedulerImpl {
110
    /// Refreshes the lifetime of the worker with the given timestamp.
111
2
    fn refresh_lifetime(
112
2
        &mut self,
113
2
        worker_id: &WorkerId,
114
2
        timestamp: WorkerTimestamp,
115
2
    ) -> Result<(), Error> {
116
2
        let worker = self.workers.0.peek_mut(worker_id).ok_or_else(|| 
{0
117
0
            make_input_err!(
118
                "Worker not found in worker map in refresh_lifetime() {}",
119
                worker_id
120
            )
121
0
        })?;
122
0
        error_if!(
123
2
            worker.last_update_timestamp > timestamp,
  Branch (123:13): [True: 0, False: 2]
  Branch (123:13): [Folded - Ignored]
124
            "Worker already had a timestamp of {}, but tried to update it with {}",
125
            worker.last_update_timestamp,
126
            timestamp
127
        );
128
2
        worker.last_update_timestamp = timestamp;
129
2
        for 
operation_id0
in worker.running_action_infos.keys() {
130
0
            if self
  Branch (130:16): [True: 0, False: 0]
  Branch (130:16): [Folded - Ignored]
131
0
                .operation_keep_alive_tx
132
0
                .send((operation_id.clone(), worker_id.clone()))
133
0
                .is_err()
134
            {
135
0
                error!(
136
                    ?operation_id,
137
                    ?worker_id,
138
0
                    "OperationKeepAliveTx stream closed"
139
                );
140
0
            }
141
        }
142
2
        Ok(())
143
2
    }
144
145
    /// Adds a worker to the pool.
146
    /// Note: This function will not do any task matching.
147
35
    fn add_worker(&mut self, worker: Worker) -> Result<(), Error> {
148
35
        let worker_id = worker.id.clone();
149
35
        self.workers.put(worker_id.clone(), worker);
150
151
        // Worker is not cloneable, and we do not want to send the initial connection results until
152
        // we have added it to the map, or we might get some strange race conditions due to the way
153
        // the multi-threaded runtime works.
154
35
        let worker = self.workers.peek_mut(&worker_id).unwrap();
155
35
        let res = worker
156
35
            .send_initial_connection_result()
157
35
            .err_tip(|| "Failed to send initial connection result to worker");
158
35
        if let Err(
err0
) = &res {
  Branch (158:16): [True: 0, False: 35]
  Branch (158:16): [Folded - Ignored]
159
0
            error!(
160
                ?worker_id,
161
                ?err,
162
0
                "Worker connection appears to have been closed while adding to pool"
163
            );
164
35
        }
165
35
        self.worker_change_notify.notify_one();
166
35
        res
167
35
    }
168
169
    /// Removes worker from pool.
170
    /// Note: The caller is responsible for any rescheduling of any tasks that might be
171
    /// running.
172
10
    fn remove_worker(&mut self, worker_id: &WorkerId) -> Option<Worker> {
173
10
        let result = self.workers.pop(worker_id);
174
10
        self.worker_change_notify.notify_one();
175
10
        result
176
10
    }
177
178
    /// Sets if the worker is draining or not.
179
2
    async fn set_drain_worker(
180
2
        &mut self,
181
2
        worker_id: &WorkerId,
182
2
        is_draining: bool,
183
2
    ) -> Result<(), Error> {
184
2
        let worker = self
185
2
            .workers
186
2
            .get_mut(worker_id)
187
2
            .err_tip(|| format!(
"Worker {worker_id} doesn't exist in the pool"0
))
?0
;
188
2
        worker.is_draining = is_draining;
189
2
        self.worker_change_notify.notify_one();
190
2
        Ok(())
191
2
    }
192
193
    #[cfg_attr(not(feature = "worker_find_logging"), allow(unused_variables))]
194
38
    fn inner_worker_checker(
195
38
        (worker_id, w): &(&WorkerId, &Worker),
196
38
        platform_properties: &PlatformProperties,
197
38
    ) -> bool {
198
        #[cfg(feature = "worker_find_logging")]
199
        {
200
            if !w.can_accept_work() {
201
                info!(
202
                    "Worker {worker_id} cannot accept work because is_paused: {}, is_draining: {}",
203
                    w.is_paused, w.is_draining
204
                );
205
                return false;
206
            }
207
            if !platform_properties.is_satisfied_by(&w.platform_properties) {
208
                info!("Worker {worker_id} properties are insufficient");
209
                return false;
210
            }
211
            return true;
212
        }
213
        #[cfg(not(feature = "worker_find_logging"))]
214
        {
215
38
            w.can_accept_work() && 
platform_properties37
.
is_satisfied_by37
(
&w.platform_properties37
)
  Branch (215:13): [True: 37, False: 1]
  Branch (215:13): [Folded - Ignored]
216
        }
217
38
    }
218
219
50
    fn inner_find_worker_for_action(
220
50
        &self,
221
50
        platform_properties: &PlatformProperties,
222
50
    ) -> Option<WorkerId> {
223
50
        let mut workers_iter = self.workers.iter();
224
50
        let workers_iter =
225
50
            match self.allocation_strategy {
226
                // Use rfind to get the least recently used that satisfies the properties.
227
50
                WorkerAllocationStrategy::LeastRecentlyUsed => workers_iter
228
50
                    .rfind(|worker| 
Self::inner_worker_checker38
(
worker38
,
platform_properties38
)),
229
                // Use find to get the most recently used that satisfies the properties.
230
0
                WorkerAllocationStrategy::MostRecentlyUsed => workers_iter
231
0
                    .find(|worker| Self::inner_worker_checker(worker, platform_properties)),
232
            };
233
50
        workers_iter.map(|(_, w)| 
w.id32
.
clone32
())
234
50
    }
235
236
10
    async fn update_action(
237
10
        &mut self,
238
10
        worker_id: &WorkerId,
239
10
        operation_id: &OperationId,
240
10
        update: UpdateOperationType,
241
10
    ) -> Result<(), Error> {
242
10
        let worker = self.workers.get_mut(worker_id).err_tip(|| 
{0
243
0
            format!("Worker {worker_id} does not exist in SimpleScheduler::update_action")
244
0
        })?;
245
246
        // Ensure the worker is supposed to be running the operation.
247
10
        if !worker.running_action_infos.contains_key(operation_id) {
  Branch (247:12): [True: 1, False: 9]
  Branch (247:12): [Folded - Ignored]
248
1
            let err = make_err!(
249
1
                Code::Internal,
250
                "Operation {operation_id} should not be running on worker {worker_id} in SimpleScheduler::update_action"
251
            );
252
1
            return Result::<(), _>::Err(err.clone())
253
1
                .merge(self.immediate_evict_worker(worker_id, err, false).await);
254
9
        }
255
256
9
        let (is_finished, due_to_backpressure) = match &update {
257
6
            UpdateOperationType::UpdateWithActionStage(action_stage) => {
258
6
                (action_stage.is_finished(), false)
259
            }
260
0
            UpdateOperationType::KeepAlive => (false, false),
261
3
            UpdateOperationType::UpdateWithError(err) => {
262
3
                (true, err.code == Code::ResourceExhausted)
263
            }
264
0
            UpdateOperationType::UpdateWithDisconnect => (true, false),
265
            UpdateOperationType::ExecutionComplete => {
266
                // No update here, just restoring platform properties.
267
0
                worker.execution_complete(operation_id);
268
0
                self.worker_change_notify.notify_one();
269
0
                return Ok(());
270
            }
271
        };
272
273
        // Update the operation in the worker state manager.
274
        {
275
9
            let 
update_operation_res8
= self
276
9
                .worker_state_manager
277
9
                .update_operation(operation_id, worker_id, update)
278
9
                .await
279
8
                .err_tip(|| "in update_operation on SimpleScheduler::update_action");
280
8
            if let Err(
err0
) = update_operation_res {
  Branch (280:20): [True: 0, False: 8]
  Branch (280:20): [Folded - Ignored]
281
0
                error!(
282
                    ?operation_id,
283
                    ?worker_id,
284
                    ?err,
285
0
                    "Failed to update_operation on update_action"
286
                );
287
0
                return Err(err);
288
8
            }
289
        }
290
291
8
        if !is_finished {
  Branch (291:12): [True: 0, False: 8]
  Branch (291:12): [Folded - Ignored]
292
0
            return Ok(());
293
8
        }
294
295
        // Clear this action from the current worker if finished.
296
8
        let complete_action_res = {
297
8
            let was_paused = !worker.can_accept_work();
298
299
            // Note: We need to run this before dealing with backpressure logic.
300
8
            let complete_action_res = worker.complete_action(operation_id).await;
301
302
            // Only pause if there's an action still waiting that will unpause.
303
8
            if (was_paused || due_to_backpressure) && 
worker0
.
has_actions0
() {
  Branch (303:17): [True: 0, False: 8]
  Branch (303:31): [True: 0, False: 8]
  Branch (303:55): [True: 0, False: 0]
  Branch (303:17): [Folded - Ignored]
  Branch (303:31): [Folded - Ignored]
  Branch (303:55): [Folded - Ignored]
304
0
                worker.is_paused = true;
305
8
            }
306
8
            complete_action_res
307
        };
308
309
8
        self.worker_change_notify.notify_one();
310
311
8
        complete_action_res
312
9
    }
313
314
    /// Notifies the specified worker to run the given action and handles errors by evicting
315
    /// the worker if the notification fails.
316
32
    async fn worker_notify_run_action(
317
32
        &mut self,
318
32
        worker_id: WorkerId,
319
32
        operation_id: OperationId,
320
32
        action_info: ActionInfoWithProps,
321
32
    ) -> Result<(), Error> {
322
32
        if let Some(worker) = self.workers.get_mut(&worker_id) {
  Branch (322:16): [True: 0, False: 0]
  Branch (322:16): [Folded - Ignored]
  Branch (322:16): [True: 4, False: 0]
  Branch (322:16): [True: 27, False: 0]
  Branch (322:16): [True: 1, False: 0]
323
32
            let notify_worker_result = worker
324
32
                .notify_update(WorkerUpdate::RunAction((operation_id, action_info.clone())))
325
32
                .await;
326
327
32
            if let Err(
notify_worker_result1
) = notify_worker_result {
  Branch (327:20): [True: 0, False: 0]
  Branch (327:20): [Folded - Ignored]
  Branch (327:20): [True: 0, False: 4]
  Branch (327:20): [True: 1, False: 26]
  Branch (327:20): [True: 0, False: 1]
328
1
                warn!(
329
                    ?worker_id,
330
                    ?action_info,
331
                    ?notify_worker_result,
332
1
                    "Worker command failed, removing worker",
333
                );
334
335
                // A slightly nasty way of figuring out that the worker disconnected
336
                // from send_msg_to_worker without introducing complexity to the
337
                // code path from here to there.
338
1
                let is_disconnect = notify_worker_result.code == Code::Internal
  Branch (338:37): [True: 0, False: 0]
  Branch (338:37): [Folded - Ignored]
  Branch (338:37): [True: 0, False: 0]
  Branch (338:37): [True: 1, False: 0]
  Branch (338:37): [True: 0, False: 0]
339
1
                    && notify_worker_result.messages.len() == 1
  Branch (339:24): [True: 0, False: 0]
  Branch (339:24): [Folded - Ignored]
  Branch (339:24): [True: 0, False: 0]
  Branch (339:24): [True: 1, False: 0]
  Branch (339:24): [True: 0, False: 0]
340
1
                    && notify_worker_result.messages[0] == "Worker Disconnected";
341
342
1
                let err = make_err!(
343
1
                    Code::Internal,
344
                    "Worker command failed, removing worker {worker_id} -- {notify_worker_result:?}",
345
                );
346
347
1
                return Result::<(), _>::Err(err.clone()).merge(
348
1
                    self.immediate_evict_worker(&worker_id, err, is_disconnect)
349
1
                        .await,
350
                );
351
31
            }
352
31
            Ok(())
353
        } else {
354
0
            warn!(
355
                ?worker_id,
356
                ?operation_id,
357
                ?action_info,
358
0
                "Worker not found in worker map in worker_notify_run_action"
359
            );
360
            // Ensure the operation is put back to queued state.
361
0
            self.worker_state_manager
362
0
                .update_operation(
363
0
                    &operation_id,
364
0
                    &worker_id,
365
0
                    UpdateOperationType::UpdateWithDisconnect,
366
0
                )
367
0
                .await
368
        }
369
32
    }
370
371
    /// Evicts the worker from the pool and puts items back into the queue if anything was being executed on it.
372
10
    async fn immediate_evict_worker(
373
10
        &mut self,
374
10
        worker_id: &WorkerId,
375
10
        err: Error,
376
10
        is_disconnect: bool,
377
10
    ) -> Result<(), Error> {
378
10
        let mut result = Ok(());
379
10
        if let Some(mut worker) = self.remove_worker(worker_id) {
  Branch (379:16): [True: 9, False: 0]
  Branch (379:16): [Folded - Ignored]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 1, False: 0]
  Branch (379:16): [True: 0, False: 0]
380
            // We don't care if we fail to send message to worker, this is only a best attempt.
381
10
            drop(worker.notify_update(WorkerUpdate::Disconnect).await);
382
10
            let update = if is_disconnect {
  Branch (382:29): [True: 0, False: 9]
  Branch (382:29): [Folded - Ignored]
  Branch (382:29): [True: 0, False: 0]
  Branch (382:29): [True: 0, False: 1]
  Branch (382:29): [True: 0, False: 0]
383
0
                UpdateOperationType::UpdateWithDisconnect
384
            } else {
385
10
                UpdateOperationType::UpdateWithError(err)
386
            };
387
10
            for (
operation_id8
, _) in worker.running_action_infos.drain() {
388
8
                result = result.merge(
389
8
                    self.worker_state_manager
390
8
                        .update_operation(&operation_id, worker_id, update.clone())
391
8
                        .await,
392
                );
393
            }
394
0
        }
395
        // Note: Calling this many time is very cheap, it'll only trigger `do_try_match` once.
396
        // TODO(palfrey) This should be moved to inside the Workers struct.
397
10
        self.worker_change_notify.notify_one();
398
10
        result
399
10
    }
400
}
401
402
#[derive(Debug, MetricsComponent)]
403
pub struct ApiWorkerScheduler {
404
    #[metric]
405
    inner: Mutex<ApiWorkerSchedulerImpl>,
406
    #[metric(group = "platform_property_manager")]
407
    platform_property_manager: Arc<PlatformPropertyManager>,
408
409
    #[metric(
410
        help = "Timeout of how long to evict workers if no response in this given amount of time in seconds."
411
    )]
412
    worker_timeout_s: u64,
413
    _operation_keep_alive_spawn: JoinHandleDropGuard<()>,
414
}
415
416
impl ApiWorkerScheduler {
417
29
    pub fn new(
418
29
        worker_state_manager: Arc<dyn WorkerStateManager>,
419
29
        platform_property_manager: Arc<PlatformPropertyManager>,
420
29
        allocation_strategy: WorkerAllocationStrategy,
421
29
        worker_change_notify: Arc<Notify>,
422
29
        worker_timeout_s: u64,
423
29
    ) -> Arc<Self> {
424
29
        let (operation_keep_alive_tx, mut operation_keep_alive_rx) = mpsc::unbounded_channel();
425
29
        Arc::new(Self {
426
29
            inner: Mutex::new(ApiWorkerSchedulerImpl {
427
29
                workers: Workers(LruCache::unbounded()),
428
29
                worker_state_manager: worker_state_manager.clone(),
429
29
                allocation_strategy,
430
29
                worker_change_notify,
431
29
                operation_keep_alive_tx,
432
29
                shutting_down: false,
433
29
            }),
434
29
            platform_property_manager,
435
29
            worker_timeout_s,
436
29
            _operation_keep_alive_spawn: spawn!(
437
                "simple_scheduler_operation_keep_alive",
438
25
                async move {
439
                    const RECV_MANY_LIMIT: usize = 256;
440
25
                    let mut messages = Vec::with_capacity(RECV_MANY_LIMIT);
441
                    loop {
442
25
                        messages.clear();
443
25
                        operation_keep_alive_rx
444
25
                            .recv_many(&mut messages, RECV_MANY_LIMIT)
445
25
                            .await;
446
0
                        if messages.is_empty() {
  Branch (446:28): [True: 0, False: 0]
  Branch (446:28): [Folded - Ignored]
447
0
                            return; // Looks like our sender has been dropped.
448
0
                        }
449
0
                        for (operation_id, worker_id) in messages.drain(..) {
450
0
                            let update_operation_res = worker_state_manager
451
0
                                .update_operation(
452
0
                                    &operation_id,
453
0
                                    &worker_id,
454
0
                                    UpdateOperationType::KeepAlive,
455
0
                                )
456
0
                                .await;
457
0
                            if let Err(err) = update_operation_res {
  Branch (457:36): [True: 0, False: 0]
  Branch (457:36): [Folded - Ignored]
458
0
                                warn!(
459
                                    ?err,
460
0
                                    "Error while running worker_keep_alive_received, maybe job is done?"
461
                                );
462
0
                            }
463
                        }
464
                    }
465
0
                }
466
            ),
467
        })
468
29
    }
469
470
32
    pub async fn worker_notify_run_action(
471
32
        &self,
472
32
        worker_id: WorkerId,
473
32
        operation_id: OperationId,
474
32
        action_info: ActionInfoWithProps,
475
32
    ) -> Result<(), Error> {
476
32
        let mut inner = self.inner.lock().await;
477
32
        inner
478
32
            .worker_notify_run_action(worker_id, operation_id, action_info)
479
32
            .await
480
32
    }
481
482
    /// Attempts to find a worker that is capable of running this action.
483
    // TODO(palfrey) This algorithm is not very efficient. Simple testing using a tree-like
484
    // structure showed worse performance on a 10_000 worker * 7 properties * 1000 queued tasks
485
    // simulation of worst cases in a single threaded environment.
486
50
    pub async fn find_worker_for_action(
487
50
        &self,
488
50
        platform_properties: &PlatformProperties,
489
50
    ) -> Option<WorkerId> {
490
50
        let inner = self.inner.lock().await;
491
50
        inner.inner_find_worker_for_action(platform_properties)
492
50
    }
493
494
    /// Checks to see if the worker exists in the worker pool. Should only be used in unit tests.
495
    #[must_use]
496
7
    
pub async fn contains_worker_for_test(&self, worker_id: &WorkerId) -> bool0
{
497
7
        let inner = self.inner.lock().await;
498
7
        inner.workers.contains(worker_id)
499
7
    }
500
501
    /// A unit test function used to send the keep alive message to the worker from the server.
502
0
    pub async fn send_keep_alive_to_worker_for_test(
503
0
        &self,
504
0
        worker_id: &WorkerId,
505
1
    ) -> Result<(), Error> {
506
1
        let mut inner = self.inner.lock().await;
507
1
        let worker = inner.workers.get_mut(worker_id).ok_or_else(|| 
{0
508
0
            make_input_err!("WorkerId '{}' does not exist in workers map", worker_id)
509
0
        })?;
510
1
        worker.keep_alive()
511
1
    }
512
}
513
514
#[async_trait]
515
impl WorkerScheduler for ApiWorkerScheduler {
516
1
    fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
517
1
        self.platform_property_manager.as_ref()
518
1
    }
519
520
35
    async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
521
        let worker_id = worker.id.clone();
522
        let mut inner = self.inner.lock().await;
523
        if inner.shutting_down {
524
            warn!("Rejected worker add during shutdown: {}", worker_id);
525
            return Err(make_err!(
526
                Code::Unavailable,
527
                "Received request to add worker while shutting down"
528
            ));
529
        }
530
        let result = inner
531
            .add_worker(worker)
532
            .err_tip(|| "Error while adding worker, removing from pool");
533
        if let Err(err) = result {
534
            return Result::<(), _>::Err(err.clone())
535
                .merge(inner.immediate_evict_worker(&worker_id, err, false).await);
536
        }
537
        Ok(())
538
35
    }
539
540
    async fn update_action(
541
        &self,
542
        worker_id: &WorkerId,
543
        operation_id: &OperationId,
544
        update: UpdateOperationType,
545
10
    ) -> Result<(), Error> {
546
        let mut inner = self.inner.lock().await;
547
        inner.update_action(worker_id, operation_id, update).await
548
10
    }
549
550
    async fn worker_keep_alive_received(
551
        &self,
552
        worker_id: &WorkerId,
553
        timestamp: WorkerTimestamp,
554
2
    ) -> Result<(), Error> {
555
        let mut inner = self.inner.lock().await;
556
        inner
557
            .refresh_lifetime(worker_id, timestamp)
558
            .err_tip(|| "Error refreshing lifetime in worker_keep_alive_received()")
559
2
    }
560
561
6
    async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
562
        let mut inner = self.inner.lock().await;
563
        inner
564
            .immediate_evict_worker(
565
                worker_id,
566
                make_err!(Code::Internal, "Received request to remove worker"),
567
                false,
568
            )
569
            .await
570
6
    }
571
572
0
    async fn shutdown(&self, shutdown_guard: ShutdownGuard) {
573
        let mut inner = self.inner.lock().await;
574
        inner.shutting_down = true; // should reject further worker registration
575
        while let Some(worker_id) = inner
576
            .workers
577
            .peek_lru()
578
0
            .map(|(worker_id, _worker)| worker_id.clone())
579
        {
580
            if let Err(err) = inner
581
                .immediate_evict_worker(
582
                    &worker_id,
583
                    make_err!(Code::Internal, "Scheduler shutdown"),
584
                    true,
585
                )
586
                .await
587
            {
588
                error!(?err, "Error evicting worker on shutdown.");
589
            }
590
        }
591
        drop(shutdown_guard);
592
0
    }
593
594
5
    async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
595
        let mut inner = self.inner.lock().await;
596
597
        let mut result = Ok(());
598
        // Items should be sorted based on last_update_timestamp, so we don't need to iterate the entire
599
        // map most of the time.
600
        let worker_ids_to_remove: Vec<WorkerId> = inner
601
            .workers
602
            .iter()
603
            .rev()
604
6
            .map_while(|(worker_id, worker)| {
605
6
                if worker.last_update_timestamp <= now_timestamp - self.worker_timeout_s {
  Branch (605:20): [True: 2, False: 4]
  Branch (605:20): [Folded - Ignored]
606
2
                    Some(worker_id.clone())
607
                } else {
608
4
                    None
609
                }
610
6
            })
611
            .collect();
612
        for worker_id in &worker_ids_to_remove {
613
            warn!(?worker_id, "Worker timed out, removing from pool");
614
            result = result.merge(
615
                inner
616
                    .immediate_evict_worker(
617
                        worker_id,
618
                        make_err!(
619
                            Code::Internal,
620
                            "Worker {worker_id} timed out, removing from pool"
621
                        ),
622
                        false,
623
                    )
624
                    .await,
625
            );
626
        }
627
628
        result
629
5
    }
630
631
2
    async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
632
        let mut inner = self.inner.lock().await;
633
        inner.set_drain_worker(worker_id, is_draining).await
634
2
    }
635
}
636
637
impl RootMetricsComponent for ApiWorkerScheduler {}