Coverage Report

Created: 2025-11-18 18:43

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/api_worker_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::ops::{Deref, DerefMut};
16
use std::sync::Arc;
17
18
use async_lock::Mutex;
19
use lru::LruCache;
20
use nativelink_config::schedulers::WorkerAllocationStrategy;
21
use nativelink_error::{Code, Error, ResultExt, error_if, make_err, make_input_err};
22
use nativelink_metric::{
23
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
24
    RootMetricsComponent, group,
25
};
26
use nativelink_util::action_messages::{OperationId, WorkerId};
27
use nativelink_util::operation_state_manager::{UpdateOperationType, WorkerStateManager};
28
use nativelink_util::platform_properties::PlatformProperties;
29
use nativelink_util::shutdown_guard::ShutdownGuard;
30
use nativelink_util::spawn;
31
use nativelink_util::task::JoinHandleDropGuard;
32
use tokio::sync::Notify;
33
use tokio::sync::mpsc::{self, UnboundedSender};
34
use tonic::async_trait;
35
use tracing::{error, info, warn};
36
37
use crate::platform_property_manager::PlatformPropertyManager;
38
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp, WorkerUpdate};
39
use crate::worker_scheduler::WorkerScheduler;
40
41
#[derive(Debug)]
42
struct Workers(LruCache<WorkerId, Worker>);
43
44
impl Deref for Workers {
45
    type Target = LruCache<WorkerId, Worker>;
46
47
62
    fn deref(&self) -> &Self::Target {
48
62
        &self.0
49
62
    }
50
}
51
52
impl DerefMut for Workers {
53
125
    fn deref_mut(&mut self) -> &mut Self::Target {
54
125
        &mut self.0
55
125
    }
56
}
57
58
// Note: This could not be a derive macro because this derive-macro
59
// does not support LruCache and nameless field structs.
60
impl MetricsComponent for Workers {
61
0
    fn publish(
62
0
        &self,
63
0
        _kind: MetricKind,
64
0
        _field_metadata: MetricFieldData,
65
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
66
0
        let _enter = group!("workers").entered();
67
0
        for (worker_id, worker) in self.iter() {
68
0
            let _enter = group!(worker_id).entered();
69
0
            worker.publish(MetricKind::Component, MetricFieldData::default())?;
70
        }
71
0
        Ok(MetricPublishKnownKindData::Component)
72
0
    }
73
}
74
75
/// A collection of workers that are available to run tasks.
76
#[derive(MetricsComponent)]
77
struct ApiWorkerSchedulerImpl {
78
    /// A `LruCache` of workers available based on `allocation_strategy`.
79
    #[metric(group = "workers")]
80
    workers: Workers,
81
82
    /// The worker state manager.
83
    #[metric(group = "worker_state_manager")]
84
    worker_state_manager: Arc<dyn WorkerStateManager>,
85
    /// The allocation strategy for workers.
86
    allocation_strategy: WorkerAllocationStrategy,
87
    /// A channel to notify the matching engine that the worker pool has changed.
88
    worker_change_notify: Arc<Notify>,
89
    /// A channel to notify that an operation is still alive.
90
    operation_keep_alive_tx: UnboundedSender<(OperationId, WorkerId)>,
91
92
    /// Whether the worker scheduler is shutting down.
93
    shutting_down: bool,
94
}
95
96
impl core::fmt::Debug for ApiWorkerSchedulerImpl {
97
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
98
0
        f.debug_struct("ApiWorkerSchedulerImpl")
99
0
            .field("workers", &self.workers)
100
0
            .field("allocation_strategy", &self.allocation_strategy)
101
0
            .field("worker_change_notify", &self.worker_change_notify)
102
0
            .field("operation_keep_alive_tx", &self.operation_keep_alive_tx)
103
0
            .finish_non_exhaustive()
104
0
    }
105
}
106
107
impl ApiWorkerSchedulerImpl {
108
    /// Refreshes the lifetime of the worker with the given timestamp.
109
2
    fn refresh_lifetime(
110
2
        &mut self,
111
2
        worker_id: &WorkerId,
112
2
        timestamp: WorkerTimestamp,
113
2
    ) -> Result<(), Error> {
114
2
        let worker = self.workers.0.peek_mut(worker_id).ok_or_else(|| 
{0
115
0
            make_input_err!(
116
                "Worker not found in worker map in refresh_lifetime() {}",
117
                worker_id
118
            )
119
0
        })?;
120
0
        error_if!(
121
2
            worker.last_update_timestamp > timestamp,
  Branch (121:13): [True: 0, False: 2]
  Branch (121:13): [Folded - Ignored]
122
            "Worker already had a timestamp of {}, but tried to update it with {}",
123
            worker.last_update_timestamp,
124
            timestamp
125
        );
126
2
        worker.last_update_timestamp = timestamp;
127
2
        for 
operation_id0
in worker.running_action_infos.keys() {
128
0
            if self
  Branch (128:16): [True: 0, False: 0]
  Branch (128:16): [Folded - Ignored]
129
0
                .operation_keep_alive_tx
130
0
                .send((operation_id.clone(), worker_id.clone()))
131
0
                .is_err()
132
            {
133
0
                error!(
134
                    %operation_id,
135
                    ?worker_id,
136
0
                    "OperationKeepAliveTx stream closed"
137
                );
138
0
            }
139
        }
140
2
        Ok(())
141
2
    }
142
143
    /// Adds a worker to the pool.
144
    /// Note: This function will not do any task matching.
145
35
    fn add_worker(&mut self, worker: Worker) -> Result<(), Error> {
146
35
        let worker_id = worker.id.clone();
147
35
        self.workers.put(worker_id.clone(), worker);
148
149
        // Worker is not cloneable, and we do not want to send the initial connection results until
150
        // we have added it to the map, or we might get some strange race conditions due to the way
151
        // the multi-threaded runtime works.
152
35
        let worker = self.workers.peek_mut(&worker_id).unwrap();
153
35
        let res = worker
154
35
            .send_initial_connection_result()
155
35
            .err_tip(|| "Failed to send initial connection result to worker");
156
35
        if let Err(
err0
) = &res {
  Branch (156:16): [True: 0, False: 35]
  Branch (156:16): [Folded - Ignored]
157
0
            error!(
158
                ?worker_id,
159
                ?err,
160
0
                "Worker connection appears to have been closed while adding to pool"
161
            );
162
35
        }
163
35
        self.worker_change_notify.notify_one();
164
35
        res
165
35
    }
166
167
    /// Removes worker from pool.
168
    /// Note: The caller is responsible for any rescheduling of any tasks that might be
169
    /// running.
170
10
    fn remove_worker(&mut self, worker_id: &WorkerId) -> Option<Worker> {
171
10
        let result = self.workers.pop(worker_id);
172
10
        self.worker_change_notify.notify_one();
173
10
        result
174
10
    }
175
176
    /// Sets if the worker is draining or not.
177
2
    async fn set_drain_worker(
178
2
        &mut self,
179
2
        worker_id: &WorkerId,
180
2
        is_draining: bool,
181
2
    ) -> Result<(), Error> {
182
2
        let worker = self
183
2
            .workers
184
2
            .get_mut(worker_id)
185
2
            .err_tip(|| format!(
"Worker {worker_id} doesn't exist in the pool"0
))
?0
;
186
2
        worker.is_draining = is_draining;
187
2
        self.worker_change_notify.notify_one();
188
2
        Ok(())
189
2
    }
190
191
38
    fn inner_worker_checker(
192
38
        (worker_id, w): &(&WorkerId, &Worker),
193
38
        platform_properties: &PlatformProperties,
194
38
        full_worker_logging: bool,
195
38
    ) -> bool {
196
38
        if !w.can_accept_work() {
  Branch (196:12): [True: 1, False: 37]
  Branch (196:12): [Folded - Ignored]
197
1
            if full_worker_logging {
  Branch (197:16): [True: 1, False: 0]
  Branch (197:16): [Folded - Ignored]
198
1
                info!(
199
1
                    "Worker {worker_id} cannot accept work because is_paused: {}, is_draining: {}",
200
                    w.is_paused, w.is_draining
201
                );
202
0
            }
203
1
            false
204
37
        } else if !platform_properties.is_satisfied_by(&w.platform_properties, full_worker_logging)
  Branch (204:19): [True: 5, False: 32]
  Branch (204:19): [Folded - Ignored]
205
        {
206
5
            if full_worker_logging {
  Branch (206:16): [True: 5, False: 0]
  Branch (206:16): [Folded - Ignored]
207
5
                info!("Worker {worker_id} properties are insufficient");
208
0
            }
209
5
            false
210
        } else {
211
32
            true
212
        }
213
38
    }
214
215
50
    fn inner_find_worker_for_action(
216
50
        &self,
217
50
        platform_properties: &PlatformProperties,
218
50
        full_worker_logging: bool,
219
50
    ) -> Option<WorkerId> {
220
50
        let mut workers_iter = self.workers.iter();
221
50
        let workers_iter = match self.allocation_strategy {
222
            // Use rfind to get the least recently used that satisfies the properties.
223
50
            WorkerAllocationStrategy::LeastRecentlyUsed => workers_iter.rfind(|worker| 
{38
224
38
                Self::inner_worker_checker(worker, platform_properties, full_worker_logging)
225
38
            }),
226
            // Use find to get the most recently used that satisfies the properties.
227
0
            WorkerAllocationStrategy::MostRecentlyUsed => workers_iter.find(|worker| {
228
0
                Self::inner_worker_checker(worker, platform_properties, full_worker_logging)
229
0
            }),
230
        };
231
50
        workers_iter.map(|(_, w)| 
w.id32
.
clone32
())
232
50
    }
233
234
10
    async fn update_action(
235
10
        &mut self,
236
10
        worker_id: &WorkerId,
237
10
        operation_id: &OperationId,
238
10
        update: UpdateOperationType,
239
10
    ) -> Result<(), Error> {
240
10
        let worker = self.workers.get_mut(worker_id).err_tip(|| 
{0
241
0
            format!("Worker {worker_id} does not exist in SimpleScheduler::update_action")
242
0
        })?;
243
244
        // Ensure the worker is supposed to be running the operation.
245
10
        if !worker.running_action_infos.contains_key(operation_id) {
  Branch (245:12): [True: 1, False: 9]
  Branch (245:12): [Folded - Ignored]
246
1
            let err = make_err!(
247
1
                Code::Internal,
248
                "Operation {operation_id} should not be running on worker {worker_id} in SimpleScheduler::update_action"
249
            );
250
1
            return Result::<(), _>::Err(err.clone())
251
1
                .merge(self.immediate_evict_worker(worker_id, err, false).await);
252
9
        }
253
254
9
        let (is_finished, due_to_backpressure) = match &update {
255
6
            UpdateOperationType::UpdateWithActionStage(action_stage) => {
256
6
                (action_stage.is_finished(), false)
257
            }
258
0
            UpdateOperationType::KeepAlive => (false, false),
259
3
            UpdateOperationType::UpdateWithError(err) => {
260
3
                (true, err.code == Code::ResourceExhausted)
261
            }
262
0
            UpdateOperationType::UpdateWithDisconnect => (true, false),
263
            UpdateOperationType::ExecutionComplete => {
264
                // No update here, just restoring platform properties.
265
0
                worker.execution_complete(operation_id);
266
0
                self.worker_change_notify.notify_one();
267
0
                return Ok(());
268
            }
269
        };
270
271
        // Update the operation in the worker state manager.
272
        {
273
9
            let 
update_operation_res8
= self
274
9
                .worker_state_manager
275
9
                .update_operation(operation_id, worker_id, update)
276
9
                .await
277
8
                .err_tip(|| "in update_operation on SimpleScheduler::update_action");
278
8
            if let Err(
err0
) = update_operation_res {
  Branch (278:20): [True: 0, False: 8]
  Branch (278:20): [Folded - Ignored]
279
0
                error!(
280
                    %operation_id,
281
                    ?worker_id,
282
                    ?err,
283
0
                    "Failed to update_operation on update_action"
284
                );
285
0
                return Err(err);
286
8
            }
287
        }
288
289
8
        if !is_finished {
  Branch (289:12): [True: 0, False: 8]
  Branch (289:12): [Folded - Ignored]
290
0
            return Ok(());
291
8
        }
292
293
        // Clear this action from the current worker if finished.
294
8
        let complete_action_res = {
295
8
            let was_paused = !worker.can_accept_work();
296
297
            // Note: We need to run this before dealing with backpressure logic.
298
8
            let complete_action_res = worker.complete_action(operation_id).await;
299
300
            // Only pause if there's an action still waiting that will unpause.
301
8
            if (was_paused || due_to_backpressure) && 
worker0
.
has_actions0
() {
  Branch (301:17): [True: 0, False: 8]
  Branch (301:31): [True: 0, False: 8]
  Branch (301:55): [True: 0, False: 0]
  Branch (301:17): [Folded - Ignored]
  Branch (301:31): [Folded - Ignored]
  Branch (301:55): [Folded - Ignored]
302
0
                worker.is_paused = true;
303
8
            }
304
8
            complete_action_res
305
        };
306
307
8
        self.worker_change_notify.notify_one();
308
309
8
        complete_action_res
310
9
    }
311
312
    /// Notifies the specified worker to run the given action and handles errors by evicting
313
    /// the worker if the notification fails.
314
32
    async fn worker_notify_run_action(
315
32
        &mut self,
316
32
        worker_id: WorkerId,
317
32
        operation_id: OperationId,
318
32
        action_info: ActionInfoWithProps,
319
32
    ) -> Result<(), Error> {
320
32
        if let Some(worker) = self.workers.get_mut(&worker_id) {
  Branch (320:16): [True: 0, False: 0]
  Branch (320:16): [Folded - Ignored]
  Branch (320:16): [True: 4, False: 0]
  Branch (320:16): [True: 27, False: 0]
  Branch (320:16): [True: 1, False: 0]
321
32
            let notify_worker_result = worker
322
32
                .notify_update(WorkerUpdate::RunAction((operation_id, action_info.clone())))
323
32
                .await;
324
325
32
            if let Err(
notify_worker_result1
) = notify_worker_result {
  Branch (325:20): [True: 0, False: 0]
  Branch (325:20): [Folded - Ignored]
  Branch (325:20): [True: 0, False: 4]
  Branch (325:20): [True: 1, False: 26]
  Branch (325:20): [True: 0, False: 1]
326
1
                warn!(
327
                    ?worker_id,
328
                    ?action_info,
329
                    ?notify_worker_result,
330
1
                    "Worker command failed, removing worker",
331
                );
332
333
                // A slightly nasty way of figuring out that the worker disconnected
334
                // from send_msg_to_worker without introducing complexity to the
335
                // code path from here to there.
336
1
                let is_disconnect = notify_worker_result.code == Code::Internal
  Branch (336:37): [True: 0, False: 0]
  Branch (336:37): [Folded - Ignored]
  Branch (336:37): [True: 0, False: 0]
  Branch (336:37): [True: 1, False: 0]
  Branch (336:37): [True: 0, False: 0]
337
1
                    && notify_worker_result.messages.len() == 1
  Branch (337:24): [True: 0, False: 0]
  Branch (337:24): [Folded - Ignored]
  Branch (337:24): [True: 0, False: 0]
  Branch (337:24): [True: 1, False: 0]
  Branch (337:24): [True: 0, False: 0]
338
1
                    && notify_worker_result.messages[0] == "Worker Disconnected";
339
340
1
                let err = make_err!(
341
1
                    Code::Internal,
342
                    "Worker command failed, removing worker {worker_id} -- {notify_worker_result:?}",
343
                );
344
345
1
                return Result::<(), _>::Err(err.clone()).merge(
346
1
                    self.immediate_evict_worker(&worker_id, err, is_disconnect)
347
1
                        .await,
348
                );
349
31
            }
350
31
            Ok(())
351
        } else {
352
0
            warn!(
353
                ?worker_id,
354
                %operation_id,
355
                ?action_info,
356
0
                "Worker not found in worker map in worker_notify_run_action"
357
            );
358
            // Ensure the operation is put back to queued state.
359
0
            self.worker_state_manager
360
0
                .update_operation(
361
0
                    &operation_id,
362
0
                    &worker_id,
363
0
                    UpdateOperationType::UpdateWithDisconnect,
364
0
                )
365
0
                .await
366
        }
367
32
    }
368
369
    /// Evicts the worker from the pool and puts items back into the queue if anything was being executed on it.
370
10
    async fn immediate_evict_worker(
371
10
        &mut self,
372
10
        worker_id: &WorkerId,
373
10
        err: Error,
374
10
        is_disconnect: bool,
375
10
    ) -> Result<(), Error> {
376
10
        let mut result = Ok(());
377
10
        if let Some(mut worker) = self.remove_worker(worker_id) {
  Branch (377:16): [True: 9, False: 0]
  Branch (377:16): [Folded - Ignored]
  Branch (377:16): [True: 0, False: 0]
  Branch (377:16): [True: 1, False: 0]
  Branch (377:16): [True: 0, False: 0]
378
            // We don't care if we fail to send message to worker, this is only a best attempt.
379
10
            drop(worker.notify_update(WorkerUpdate::Disconnect).await);
380
10
            let update = if is_disconnect {
  Branch (380:29): [True: 0, False: 9]
  Branch (380:29): [Folded - Ignored]
  Branch (380:29): [True: 0, False: 0]
  Branch (380:29): [True: 0, False: 1]
  Branch (380:29): [True: 0, False: 0]
381
0
                UpdateOperationType::UpdateWithDisconnect
382
            } else {
383
10
                UpdateOperationType::UpdateWithError(err)
384
            };
385
10
            for (
operation_id8
, _) in worker.running_action_infos.drain() {
386
8
                result = result.merge(
387
8
                    self.worker_state_manager
388
8
                        .update_operation(&operation_id, worker_id, update.clone())
389
8
                        .await,
390
                );
391
            }
392
0
        }
393
        // Note: Calling this many time is very cheap, it'll only trigger `do_try_match` once.
394
        // TODO(palfrey) This should be moved to inside the Workers struct.
395
10
        self.worker_change_notify.notify_one();
396
10
        result
397
10
    }
398
}
399
400
#[derive(Debug, MetricsComponent)]
401
pub struct ApiWorkerScheduler {
402
    #[metric]
403
    inner: Mutex<ApiWorkerSchedulerImpl>,
404
    #[metric(group = "platform_property_manager")]
405
    platform_property_manager: Arc<PlatformPropertyManager>,
406
407
    #[metric(
408
        help = "Timeout of how long to evict workers if no response in this given amount of time in seconds."
409
    )]
410
    worker_timeout_s: u64,
411
    _operation_keep_alive_spawn: JoinHandleDropGuard<()>,
412
}
413
414
impl ApiWorkerScheduler {
415
30
    pub fn new(
416
30
        worker_state_manager: Arc<dyn WorkerStateManager>,
417
30
        platform_property_manager: Arc<PlatformPropertyManager>,
418
30
        allocation_strategy: WorkerAllocationStrategy,
419
30
        worker_change_notify: Arc<Notify>,
420
30
        worker_timeout_s: u64,
421
30
    ) -> Arc<Self> {
422
30
        let (operation_keep_alive_tx, mut operation_keep_alive_rx) = mpsc::unbounded_channel();
423
30
        Arc::new(Self {
424
30
            inner: Mutex::new(ApiWorkerSchedulerImpl {
425
30
                workers: Workers(LruCache::unbounded()),
426
30
                worker_state_manager: worker_state_manager.clone(),
427
30
                allocation_strategy,
428
30
                worker_change_notify,
429
30
                operation_keep_alive_tx,
430
30
                shutting_down: false,
431
30
            }),
432
30
            platform_property_manager,
433
30
            worker_timeout_s,
434
30
            _operation_keep_alive_spawn: spawn!(
435
                "simple_scheduler_operation_keep_alive",
436
25
                async move {
437
                    const RECV_MANY_LIMIT: usize = 256;
438
25
                    let mut messages = Vec::with_capacity(RECV_MANY_LIMIT);
439
                    loop {
440
25
                        messages.clear();
441
25
                        operation_keep_alive_rx
442
25
                            .recv_many(&mut messages, RECV_MANY_LIMIT)
443
25
                            .await;
444
0
                        if messages.is_empty() {
  Branch (444:28): [True: 0, False: 0]
  Branch (444:28): [Folded - Ignored]
445
0
                            return; // Looks like our sender has been dropped.
446
0
                        }
447
0
                        for (operation_id, worker_id) in messages.drain(..) {
448
0
                            let update_operation_res = worker_state_manager
449
0
                                .update_operation(
450
0
                                    &operation_id,
451
0
                                    &worker_id,
452
0
                                    UpdateOperationType::KeepAlive,
453
0
                                )
454
0
                                .await;
455
0
                            if let Err(err) = update_operation_res {
  Branch (455:36): [True: 0, False: 0]
  Branch (455:36): [Folded - Ignored]
456
0
                                warn!(
457
                                    ?err,
458
0
                                    "Error while running worker_keep_alive_received, maybe job is done?"
459
                                );
460
0
                            }
461
                        }
462
                    }
463
0
                }
464
            ),
465
        })
466
30
    }
467
468
32
    pub async fn worker_notify_run_action(
469
32
        &self,
470
32
        worker_id: WorkerId,
471
32
        operation_id: OperationId,
472
32
        action_info: ActionInfoWithProps,
473
32
    ) -> Result<(), Error> {
474
32
        let mut inner = self.inner.lock().await;
475
32
        inner
476
32
            .worker_notify_run_action(worker_id, operation_id, action_info)
477
32
            .await
478
32
    }
479
480
    /// Attempts to find a worker that is capable of running this action.
481
    // TODO(palfrey) This algorithm is not very efficient. Simple testing using a tree-like
482
    // structure showed worse performance on a 10_000 worker * 7 properties * 1000 queued tasks
483
    // simulation of worst cases in a single threaded environment.
484
50
    pub async fn find_worker_for_action(
485
50
        &self,
486
50
        platform_properties: &PlatformProperties,
487
50
        full_worker_logging: bool,
488
50
    ) -> Option<WorkerId> {
489
50
        let inner = self.inner.lock().await;
490
50
        inner.inner_find_worker_for_action(platform_properties, full_worker_logging)
491
50
    }
492
493
    /// Checks to see if the worker exists in the worker pool. Should only be used in unit tests.
494
    #[must_use]
495
7
    
pub async fn contains_worker_for_test(&self, worker_id: &WorkerId) -> bool0
{
496
7
        let inner = self.inner.lock().await;
497
7
        inner.workers.contains(worker_id)
498
7
    }
499
500
    /// A unit test function used to send the keep alive message to the worker from the server.
501
0
    pub async fn send_keep_alive_to_worker_for_test(
502
0
        &self,
503
0
        worker_id: &WorkerId,
504
1
    ) -> Result<(), Error> {
505
1
        let mut inner = self.inner.lock().await;
506
1
        let worker = inner.workers.get_mut(worker_id).ok_or_else(|| 
{0
507
0
            make_input_err!("WorkerId '{}' does not exist in workers map", worker_id)
508
0
        })?;
509
1
        worker.keep_alive()
510
1
    }
511
}
512
513
#[async_trait]
514
impl WorkerScheduler for ApiWorkerScheduler {
515
1
    fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
516
1
        self.platform_property_manager.as_ref()
517
1
    }
518
519
35
    async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
520
        let worker_id = worker.id.clone();
521
        let mut inner = self.inner.lock().await;
522
        if inner.shutting_down {
523
            warn!("Rejected worker add during shutdown: {}", worker_id);
524
            return Err(make_err!(
525
                Code::Unavailable,
526
                "Received request to add worker while shutting down"
527
            ));
528
        }
529
        let result = inner
530
            .add_worker(worker)
531
            .err_tip(|| "Error while adding worker, removing from pool");
532
        if let Err(err) = result {
533
            return Result::<(), _>::Err(err.clone())
534
                .merge(inner.immediate_evict_worker(&worker_id, err, false).await);
535
        }
536
        Ok(())
537
35
    }
538
539
    async fn update_action(
540
        &self,
541
        worker_id: &WorkerId,
542
        operation_id: &OperationId,
543
        update: UpdateOperationType,
544
10
    ) -> Result<(), Error> {
545
        let mut inner = self.inner.lock().await;
546
        inner.update_action(worker_id, operation_id, update).await
547
10
    }
548
549
    async fn worker_keep_alive_received(
550
        &self,
551
        worker_id: &WorkerId,
552
        timestamp: WorkerTimestamp,
553
2
    ) -> Result<(), Error> {
554
        let mut inner = self.inner.lock().await;
555
        inner
556
            .refresh_lifetime(worker_id, timestamp)
557
            .err_tip(|| "Error refreshing lifetime in worker_keep_alive_received()")
558
2
    }
559
560
6
    async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
561
        let mut inner = self.inner.lock().await;
562
        inner
563
            .immediate_evict_worker(
564
                worker_id,
565
                make_err!(Code::Internal, "Received request to remove worker"),
566
                false,
567
            )
568
            .await
569
6
    }
570
571
0
    async fn shutdown(&self, shutdown_guard: ShutdownGuard) {
572
        let mut inner = self.inner.lock().await;
573
        inner.shutting_down = true; // should reject further worker registration
574
        while let Some(worker_id) = inner
575
            .workers
576
            .peek_lru()
577
0
            .map(|(worker_id, _worker)| worker_id.clone())
578
        {
579
            if let Err(err) = inner
580
                .immediate_evict_worker(
581
                    &worker_id,
582
                    make_err!(Code::Internal, "Scheduler shutdown"),
583
                    true,
584
                )
585
                .await
586
            {
587
                error!(?err, "Error evicting worker on shutdown.");
588
            }
589
        }
590
        drop(shutdown_guard);
591
0
    }
592
593
5
    async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
594
        let mut inner = self.inner.lock().await;
595
596
        let mut result = Ok(());
597
        // Items should be sorted based on last_update_timestamp, so we don't need to iterate the entire
598
        // map most of the time.
599
        let worker_ids_to_remove: Vec<WorkerId> = inner
600
            .workers
601
            .iter()
602
            .rev()
603
6
            .map_while(|(worker_id, worker)| {
604
6
                if worker.last_update_timestamp <= now_timestamp - self.worker_timeout_s {
  Branch (604:20): [True: 2, False: 4]
  Branch (604:20): [Folded - Ignored]
605
2
                    Some(worker_id.clone())
606
                } else {
607
4
                    None
608
                }
609
6
            })
610
            .collect();
611
        for worker_id in &worker_ids_to_remove {
612
            warn!(?worker_id, "Worker timed out, removing from pool");
613
            result = result.merge(
614
                inner
615
                    .immediate_evict_worker(
616
                        worker_id,
617
                        make_err!(
618
                            Code::Internal,
619
                            "Worker {worker_id} timed out, removing from pool"
620
                        ),
621
                        false,
622
                    )
623
                    .await,
624
            );
625
        }
626
627
        result
628
5
    }
629
630
2
    async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
631
        let mut inner = self.inner.lock().await;
632
        inner.set_drain_worker(worker_id, is_draining).await
633
2
    }
634
}
635
636
impl RootMetricsComponent for ApiWorkerScheduler {}