Coverage Report

Created: 2025-09-11 14:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/api_worker_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::ops::{Deref, DerefMut};
16
use std::collections::{HashMap, HashSet};
17
use std::sync::Arc;
18
19
use async_lock::Mutex;
20
use lru::LruCache;
21
use nativelink_config::schedulers::WorkerAllocationStrategy;
22
use nativelink_error::{Code, Error, ResultExt, error_if, make_err, make_input_err};
23
use nativelink_metric::{
24
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
25
    RootMetricsComponent, group,
26
};
27
use nativelink_util::action_messages::{OperationId, WorkerId};
28
use nativelink_util::operation_state_manager::{UpdateOperationType, WorkerStateManager};
29
use nativelink_util::platform_properties::PlatformProperties;
30
use nativelink_util::shutdown_guard::ShutdownGuard;
31
use nativelink_util::spawn;
32
use nativelink_util::task::JoinHandleDropGuard;
33
use tokio::sync::Notify;
34
use tokio::sync::mpsc::{self, UnboundedSender};
35
use tonic::async_trait;
36
#[cfg(feature = "worker_find_logging")]
37
use tracing::info;
38
use tracing::{error, warn};
39
40
use crate::platform_property_manager::PlatformPropertyManager;
41
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp, WorkerUpdate};
42
use crate::worker_scheduler::WorkerScheduler;
43
44
#[derive(Debug)]
45
struct Workers(LruCache<WorkerId, Worker>);
46
47
impl Deref for Workers {
48
    type Target = LruCache<WorkerId, Worker>;
49
50
62
    fn deref(&self) -> &Self::Target {
51
62
        &self.0
52
62
    }
53
}
54
55
impl DerefMut for Workers {
56
124
    fn deref_mut(&mut self) -> &mut Self::Target {
57
124
        &mut self.0
58
124
    }
59
}
60
61
// Note: This could not be a derive macro because this derive-macro
62
// does not support LruCache and nameless field structs.
63
impl MetricsComponent for Workers {
64
0
    fn publish(
65
0
        &self,
66
0
        _kind: MetricKind,
67
0
        _field_metadata: MetricFieldData,
68
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
69
0
        let _enter = group!("workers").entered();
70
0
        for (worker_id, worker) in self.iter() {
71
0
            let _enter = group!(worker_id).entered();
72
0
            worker.publish(MetricKind::Component, MetricFieldData::default())?;
73
        }
74
0
        Ok(MetricPublishKnownKindData::Component)
75
0
    }
76
}
77
78
/// A collection of workers that are available to run tasks.
79
#[derive(MetricsComponent)]
80
struct ApiWorkerSchedulerImpl {
81
    /// A `LruCache` of workers available based on `allocation_strategy`.
82
    #[metric(group = "workers")]
83
    workers: Workers,
84
85
    /// A set of operations that have notified completion, but are still
86
    /// uploading results.
87
    pending_results: HashMap<WorkerId, HashSet<OperationId>>,
88
89
    /// The worker state manager.
90
    #[metric(group = "worker_state_manager")]
91
    worker_state_manager: Arc<dyn WorkerStateManager>,
92
    /// The allocation strategy for workers.
93
    allocation_strategy: WorkerAllocationStrategy,
94
    /// A channel to notify the matching engine that the worker pool has changed.
95
    worker_change_notify: Arc<Notify>,
96
    /// A channel to notify that an operation is still alive.
97
    operation_keep_alive_tx: UnboundedSender<(OperationId, WorkerId)>,
98
}
99
100
impl core::fmt::Debug for ApiWorkerSchedulerImpl {
101
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
102
0
        f.debug_struct("ApiWorkerSchedulerImpl")
103
0
            .field("workers", &self.workers)
104
0
            .field("allocation_strategy", &self.allocation_strategy)
105
0
            .field("worker_change_notify", &self.worker_change_notify)
106
0
            .field("operation_keep_alive_tx", &self.operation_keep_alive_tx)
107
0
            .finish_non_exhaustive()
108
0
    }
109
}
110
111
impl ApiWorkerSchedulerImpl {
112
    /// Refreshes the lifetime of the worker with the given timestamp.
113
2
    fn refresh_lifetime(
114
2
        &mut self,
115
2
        worker_id: &WorkerId,
116
2
        timestamp: WorkerTimestamp,
117
2
    ) -> Result<(), Error> {
118
2
        let worker = self.workers.0.peek_mut(worker_id).ok_or_else(|| 
{0
119
0
            make_input_err!(
120
                "Worker not found in worker map in refresh_lifetime() {}",
121
                worker_id
122
            )
123
0
        })?;
124
0
        error_if!(
125
2
            worker.last_update_timestamp > timestamp,
  Branch (125:13): [True: 0, False: 2]
  Branch (125:13): [Folded - Ignored]
126
            "Worker already had a timestamp of {}, but tried to update it with {}",
127
            worker.last_update_timestamp,
128
            timestamp
129
        );
130
2
        worker.last_update_timestamp = timestamp;
131
2
        for 
operation_id0
in worker.running_action_infos.keys() {
132
0
            if self
  Branch (132:16): [True: 0, False: 0]
  Branch (132:16): [Folded - Ignored]
133
0
                .operation_keep_alive_tx
134
0
                .send((operation_id.clone(), worker_id.clone()))
135
0
                .is_err()
136
            {
137
0
                error!(
138
                    ?operation_id,
139
                    ?worker_id,
140
0
                    "OperationKeepAliveTx stream closed"
141
                );
142
0
            }
143
        }
144
2
        Ok(())
145
2
    }
146
147
    /// Adds a worker to the pool.
148
    /// Note: This function will not do any task matching.
149
35
    fn add_worker(&mut self, worker: Worker) -> Result<(), Error> {
150
35
        let worker_id = worker.id.clone();
151
35
        self.workers.put(worker_id.clone(), worker);
152
153
        // Worker is not cloneable, and we do not want to send the initial connection results until
154
        // we have added it to the map, or we might get some strange race conditions due to the way
155
        // the multi-threaded runtime works.
156
35
        let worker = self.workers.peek_mut(&worker_id).unwrap();
157
35
        let res = worker
158
35
            .send_initial_connection_result()
159
35
            .err_tip(|| "Failed to send initial connection result to worker");
160
35
        if let Err(
err0
) = &res {
  Branch (160:16): [True: 0, False: 35]
  Branch (160:16): [Folded - Ignored]
161
0
            error!(
162
                ?worker_id,
163
                ?err,
164
0
                "Worker connection appears to have been closed while adding to pool"
165
            );
166
35
        }
167
35
        self.worker_change_notify.notify_one();
168
35
        res
169
35
    }
170
171
    /// Removes worker from pool.
172
    /// Note: The caller is responsible for any rescheduling of any tasks that might be
173
    /// running.
174
9
    fn remove_worker(&mut self, worker_id: &WorkerId) -> Option<Worker> {
175
9
        let result = self.workers.pop(worker_id);
176
9
        self.pending_results.remove(worker_id);
177
9
        self.worker_change_notify.notify_one();
178
9
        result
179
9
    }
180
181
    /// Sets if the worker is draining or not.
182
2
    async fn set_drain_worker(
183
2
        &mut self,
184
2
        worker_id: &WorkerId,
185
2
        is_draining: bool,
186
2
    ) -> Result<(), Error> {
187
2
        let worker = self
188
2
            .workers
189
2
            .get_mut(worker_id)
190
2
            .err_tip(|| format!(
"Worker {worker_id} doesn't exist in the pool"0
))
?0
;
191
2
        worker.is_draining = is_draining;
192
2
        self.worker_change_notify.notify_one();
193
2
        Ok(())
194
2
    }
195
196
    #[cfg_attr(not(feature = "worker_find_logging"), allow(unused_variables))]
197
38
    fn inner_worker_checker(
198
38
        (worker_id, w): &(&WorkerId, &Worker),
199
38
        platform_properties: &PlatformProperties,
200
38
    ) -> bool {
201
        #[cfg(feature = "worker_find_logging")]
202
        {
203
            if !w.can_accept_work() {
204
                info!(
205
                    "Worker {worker_id} cannot accept work because is_paused: {}, is_draining: {}",
206
                    w.is_paused, w.is_draining
207
                );
208
                return false;
209
            }
210
            if !platform_properties.is_satisfied_by(&w.platform_properties) {
211
                info!("Worker {worker_id} properties are insufficient");
212
                return false;
213
            }
214
            return true;
215
        }
216
        #[cfg(not(feature = "worker_find_logging"))]
217
        {
218
38
            w.can_accept_work() && 
platform_properties37
.
is_satisfied_by37
(
&w.platform_properties37
)
  Branch (218:13): [True: 37, False: 1]
  Branch (218:13): [Folded - Ignored]
219
        }
220
38
    }
221
222
50
    fn inner_find_worker_for_action(
223
50
        &self,
224
50
        platform_properties: &PlatformProperties,
225
50
    ) -> Option<WorkerId> {
226
50
        let mut workers_iter = self.workers.iter();
227
50
        let workers_iter =
228
50
            match self.allocation_strategy {
229
                // Use rfind to get the least recently used that satisfies the properties.
230
50
                WorkerAllocationStrategy::LeastRecentlyUsed => workers_iter
231
50
                    .rfind(|worker| 
Self::inner_worker_checker38
(
worker38
,
platform_properties38
)),
232
                // Use find to get the most recently used that satisfies the properties.
233
0
                WorkerAllocationStrategy::MostRecentlyUsed => workers_iter
234
0
                    .find(|worker| Self::inner_worker_checker(worker, platform_properties)),
235
            };
236
50
        workers_iter.map(|(_, w)| 
w.id32
.
clone32
())
237
50
    }
238
239
10
    async fn update_action(
240
10
        &mut self,
241
10
        worker_id: &WorkerId,
242
10
        operation_id: &OperationId,
243
10
        update: UpdateOperationType,
244
10
    ) -> Result<(), Error> {
245
10
        let worker = self.workers.get_mut(worker_id).err_tip(|| 
{0
246
0
            format!("Worker {worker_id} does not exist in SimpleScheduler::update_action")
247
0
        })?;
248
249
        // Ensure the worker is supposed to be running the operation.
250
10
        let pending_completion = !worker.running_action_infos.contains_key(operation_id);
251
10
        if pending_completion
  Branch (251:12): [True: 1, False: 9]
  Branch (251:12): [Folded - Ignored]
252
1
            && !self
  Branch (252:16): [True: 1, False: 0]
  Branch (252:16): [Folded - Ignored]
253
1
                .pending_results
254
1
                .get(worker_id)
255
1
                .is_some_and(|pending_operations| 
pending_operations0
.
contains0
(
operation_id0
))
256
        {
257
1
            return Err(make_err!(
258
1
                Code::Internal,
259
1
                "Operation {operation_id} should not be running on worker {worker_id} in SimpleScheduler::update_action"
260
1
            ));
261
9
        }
262
263
9
        let (is_finished, due_to_backpressure) = match &update {
264
6
            UpdateOperationType::UpdateWithActionStage(action_stage) => {
265
6
                (action_stage.is_finished(), false)
266
            }
267
0
            UpdateOperationType::KeepAlive => (false, false),
268
3
            UpdateOperationType::UpdateWithError(err) => {
269
3
                (true, err.code == Code::ResourceExhausted)
270
            }
271
0
            UpdateOperationType::UpdateWithDisconnect => (true, false),
272
        };
273
274
        // Update the operation in the worker state manager.
275
        {
276
9
            let update_operation_res = self
277
9
                .worker_state_manager
278
9
                .update_operation(operation_id, worker_id, update)
279
9
                .await
280
9
                .err_tip(|| "in update_operation on SimpleScheduler::update_action");
281
9
            if let Err(
err0
) = update_operation_res {
  Branch (281:20): [True: 0, False: 9]
  Branch (281:20): [Folded - Ignored]
282
0
                error!(
283
                    ?operation_id,
284
                    ?worker_id,
285
                    ?err,
286
0
                    "Failed to update_operation on update_action"
287
                );
288
0
                return Err(err);
289
9
            }
290
        }
291
292
9
        if !is_finished {
  Branch (292:12): [True: 0, False: 9]
  Branch (292:12): [Folded - Ignored]
293
0
            return Ok(());
294
9
        }
295
296
9
        if pending_completion {
  Branch (296:12): [True: 0, False: 9]
  Branch (296:12): [Folded - Ignored]
297
            // This is absolutely always true, but pattern match anyway.
298
0
            if let Some(pending_operations) = self.pending_results.get_mut(worker_id) {
  Branch (298:20): [True: 0, False: 0]
  Branch (298:20): [Folded - Ignored]
299
0
                pending_operations.remove(operation_id);
300
0
                if pending_operations.is_empty() {
  Branch (300:20): [True: 0, False: 0]
  Branch (300:20): [Folded - Ignored]
301
0
                    self.pending_results.remove(worker_id);
302
0
                }
303
0
                return Ok(());
304
0
            }
305
9
        }
306
307
9
        Self::complete_worker_action(
308
9
            worker,
309
9
            operation_id,
310
9
            due_to_backpressure,
311
9
            &self.worker_change_notify,
312
9
        )
313
9
        .await
314
10
    }
315
316
9
    async fn complete_worker_action(
317
9
        worker: &mut Worker,
318
9
        operation_id: &OperationId,
319
9
        due_to_backpressure: bool,
320
9
        notify: &Notify,
321
9
    ) -> Result<(), Error> {
322
        // Clear this action from the current worker if finished.
323
9
        let complete_action_res = {
324
9
            let was_paused = !worker.can_accept_work();
325
326
            // Note: We need to run this before dealing with backpressure logic.
327
9
            let complete_action_res = worker.complete_action(operation_id).await;
328
329
            // Only pause if there's an action still waiting that will unpause.
330
9
            if (was_paused || due_to_backpressure) && 
worker0
.
has_actions0
() {
  Branch (330:17): [True: 0, False: 9]
  Branch (330:31): [True: 0, False: 9]
  Branch (330:55): [True: 0, False: 0]
  Branch (330:17): [Folded - Ignored]
  Branch (330:31): [Folded - Ignored]
  Branch (330:55): [Folded - Ignored]
331
0
                worker.is_paused = true;
332
9
            }
333
9
            complete_action_res
334
        };
335
336
9
        notify.notify_one();
337
338
9
        complete_action_res
339
9
    }
340
341
0
    async fn notify_complete(
342
0
        &mut self,
343
0
        worker_id: &WorkerId,
344
0
        operation_id: &OperationId,
345
0
    ) -> Result<(), Error> {
346
0
        let worker = self.workers.get_mut(worker_id).err_tip(|| {
347
0
            format!("Worker {worker_id} does not exist in SimpleScheduler::notify_complete")
348
0
        })?;
349
350
        // Ensure the worker is supposed to be running the operation.
351
0
        if !worker.running_action_infos.contains_key(operation_id) {
  Branch (351:12): [True: 0, False: 0]
  Branch (351:12): [Folded - Ignored]
352
0
            let err = make_err!(
353
0
                Code::Internal,
354
                "Operation {operation_id} should not be running on worker {worker_id} in SimpleScheduler::update_action"
355
            );
356
0
            return Err(err);
357
0
        }
358
359
0
        match self.pending_results.entry(worker_id.clone()) {
360
0
            std::collections::hash_map::Entry::Occupied(mut occupied_entry) => {
361
0
                occupied_entry.get_mut().insert(operation_id.clone());
362
0
            }
363
0
            std::collections::hash_map::Entry::Vacant(vacant_entry) => {
364
0
                vacant_entry
365
0
                    .insert(HashSet::new())
366
0
                    .insert(operation_id.clone());
367
0
            }
368
        }
369
370
0
        Self::complete_worker_action(worker, operation_id, false, &self.worker_change_notify).await
371
0
    }
372
373
    /// Notifies the specified worker to run the given action and handles errors by evicting
374
    /// the worker if the notification fails.
375
32
    async fn worker_notify_run_action(
376
32
        &mut self,
377
32
        worker_id: WorkerId,
378
32
        operation_id: OperationId,
379
32
        action_info: ActionInfoWithProps,
380
32
    ) -> Result<(), Error> {
381
32
        if let Some(worker) = self.workers.get_mut(&worker_id) {
  Branch (381:16): [True: 0, False: 0]
  Branch (381:16): [Folded - Ignored]
  Branch (381:16): [True: 4, False: 0]
  Branch (381:16): [True: 27, False: 0]
  Branch (381:16): [True: 1, False: 0]
382
32
            let notify_worker_result = worker
383
32
                .notify_update(WorkerUpdate::RunAction((operation_id, action_info.clone())))
384
32
                .await;
385
386
32
            if let Err(
notify_worker_result1
) = notify_worker_result {
  Branch (386:20): [True: 0, False: 0]
  Branch (386:20): [Folded - Ignored]
  Branch (386:20): [True: 0, False: 4]
  Branch (386:20): [True: 1, False: 26]
  Branch (386:20): [True: 0, False: 1]
387
1
                warn!(
388
                    ?worker_id,
389
                    ?action_info,
390
                    ?notify_worker_result,
391
1
                    "Worker command failed, removing worker",
392
                );
393
394
                // A slightly nasty way of figuring out that the worker disconnected
395
                // from send_msg_to_worker without introducing complexity to the
396
                // code path from here to there.
397
1
                let is_disconnect = notify_worker_result.code == Code::Internal
  Branch (397:37): [True: 0, False: 0]
  Branch (397:37): [Folded - Ignored]
  Branch (397:37): [True: 0, False: 0]
  Branch (397:37): [True: 1, False: 0]
  Branch (397:37): [True: 0, False: 0]
398
1
                    && notify_worker_result.messages.len() == 1
  Branch (398:24): [True: 0, False: 0]
  Branch (398:24): [Folded - Ignored]
  Branch (398:24): [True: 0, False: 0]
  Branch (398:24): [True: 1, False: 0]
  Branch (398:24): [True: 0, False: 0]
399
1
                    && notify_worker_result.messages[0] == "Worker Disconnected";
400
401
1
                let err = make_err!(
402
1
                    Code::Internal,
403
                    "Worker command failed, removing worker {worker_id} -- {notify_worker_result:?}",
404
                );
405
406
1
                return Result::<(), _>::Err(err.clone()).merge(
407
1
                    self.immediate_evict_worker(&worker_id, err, is_disconnect)
408
1
                        .await,
409
                );
410
31
            }
411
        } else {
412
0
            warn!(
413
                ?worker_id,
414
                ?operation_id,
415
                ?action_info,
416
0
                "Worker not found in worker map in worker_notify_run_action"
417
            );
418
        }
419
31
        Ok(())
420
32
    }
421
422
    /// Evicts the worker from the pool and puts items back into the queue if anything was being executed on it.
423
9
    async fn immediate_evict_worker(
424
9
        &mut self,
425
9
        worker_id: &WorkerId,
426
9
        err: Error,
427
9
        is_disconnect: bool,
428
9
    ) -> Result<(), Error> {
429
9
        let mut result = Ok(());
430
9
        if let Some(mut worker) = self.remove_worker(worker_id) {
  Branch (430:16): [True: 8, False: 0]
  Branch (430:16): [Folded - Ignored]
  Branch (430:16): [True: 0, False: 0]
  Branch (430:16): [True: 1, False: 0]
  Branch (430:16): [True: 0, False: 0]
431
            // We don't care if we fail to send message to worker, this is only a best attempt.
432
9
            drop(worker.notify_update(WorkerUpdate::Disconnect).await);
433
9
            let update = if is_disconnect {
  Branch (433:29): [True: 0, False: 8]
  Branch (433:29): [Folded - Ignored]
  Branch (433:29): [True: 0, False: 0]
  Branch (433:29): [True: 0, False: 1]
  Branch (433:29): [True: 0, False: 0]
434
0
                UpdateOperationType::UpdateWithDisconnect
435
            } else {
436
9
                UpdateOperationType::UpdateWithError(err)
437
            };
438
9
            for (
operation_id8
, _) in worker.running_action_infos.drain() {
439
8
                result = result.merge(
440
8
                    self.worker_state_manager
441
8
                        .update_operation(&operation_id, worker_id, update.clone())
442
8
                        .await,
443
                );
444
            }
445
0
        }
446
        // Note: Calling this many time is very cheap, it'll only trigger `do_try_match` once.
447
        // TODO(palfrey) This should be moved to inside the Workers struct.
448
9
        self.worker_change_notify.notify_one();
449
9
        result
450
9
    }
451
}
452
453
#[derive(Debug, MetricsComponent)]
454
pub struct ApiWorkerScheduler {
455
    #[metric]
456
    inner: Mutex<ApiWorkerSchedulerImpl>,
457
    #[metric(group = "platform_property_manager")]
458
    platform_property_manager: Arc<PlatformPropertyManager>,
459
460
    #[metric(
461
        help = "Timeout of how long to evict workers if no response in this given amount of time in seconds."
462
    )]
463
    worker_timeout_s: u64,
464
    _operation_keep_alive_spawn: JoinHandleDropGuard<()>,
465
}
466
467
impl ApiWorkerScheduler {
468
29
    pub fn new(
469
29
        worker_state_manager: Arc<dyn WorkerStateManager>,
470
29
        platform_property_manager: Arc<PlatformPropertyManager>,
471
29
        allocation_strategy: WorkerAllocationStrategy,
472
29
        worker_change_notify: Arc<Notify>,
473
29
        worker_timeout_s: u64,
474
29
    ) -> Arc<Self> {
475
29
        let (operation_keep_alive_tx, mut operation_keep_alive_rx) = mpsc::unbounded_channel();
476
29
        Arc::new(Self {
477
29
            inner: Mutex::new(ApiWorkerSchedulerImpl {
478
29
                workers: Workers(LruCache::unbounded()),
479
29
                pending_results: HashMap::new(),
480
29
                worker_state_manager: worker_state_manager.clone(),
481
29
                allocation_strategy,
482
29
                worker_change_notify,
483
29
                operation_keep_alive_tx,
484
29
            }),
485
29
            platform_property_manager,
486
29
            worker_timeout_s,
487
29
            _operation_keep_alive_spawn: spawn!(
488
                "simple_scheduler_operation_keep_alive",
489
24
                async move {
490
                    const RECV_MANY_LIMIT: usize = 256;
491
24
                    let mut messages = Vec::with_capacity(RECV_MANY_LIMIT);
492
                    loop {
493
24
                        messages.clear();
494
24
                        operation_keep_alive_rx
495
24
                            .recv_many(&mut messages, RECV_MANY_LIMIT)
496
24
                            .await;
497
0
                        if messages.is_empty() {
  Branch (497:28): [True: 0, False: 0]
  Branch (497:28): [Folded - Ignored]
498
0
                            return; // Looks like our sender has been dropped.
499
0
                        }
500
0
                        for (operation_id, worker_id) in messages.drain(..) {
501
0
                            let update_operation_res = worker_state_manager
502
0
                                .update_operation(
503
0
                                    &operation_id,
504
0
                                    &worker_id,
505
0
                                    UpdateOperationType::KeepAlive,
506
0
                                )
507
0
                                .await;
508
0
                            if let Err(err) = update_operation_res {
  Branch (508:36): [True: 0, False: 0]
  Branch (508:36): [Folded - Ignored]
509
0
                                warn!(
510
                                    ?err,
511
0
                                    "Error while running worker_keep_alive_received, maybe job is done?"
512
                                );
513
0
                            }
514
                        }
515
                    }
516
0
                }
517
            ),
518
        })
519
29
    }
520
521
32
    pub async fn worker_notify_run_action(
522
32
        &self,
523
32
        worker_id: WorkerId,
524
32
        operation_id: OperationId,
525
32
        action_info: ActionInfoWithProps,
526
32
    ) -> Result<(), Error> {
527
32
        let mut inner = self.inner.lock().await;
528
32
        inner
529
32
            .worker_notify_run_action(worker_id, operation_id, action_info)
530
32
            .await
531
32
    }
532
533
    /// Attempts to find a worker that is capable of running this action.
534
    // TODO(palfrey) This algorithm is not very efficient. Simple testing using a tree-like
535
    // structure showed worse performance on a 10_000 worker * 7 properties * 1000 queued tasks
536
    // simulation of worst cases in a single threaded environment.
537
50
    pub async fn find_worker_for_action(
538
50
        &self,
539
50
        platform_properties: &PlatformProperties,
540
50
    ) -> Option<WorkerId> {
541
50
        let inner = self.inner.lock().await;
542
50
        inner.inner_find_worker_for_action(platform_properties)
543
50
    }
544
545
    /// Checks to see if the worker exists in the worker pool. Should only be used in unit tests.
546
    #[must_use]
547
7
    pub async fn contains_worker_for_test(&self, worker_id: &WorkerId) -> bool {
548
7
        let inner = self.inner.lock().await;
549
7
        inner.workers.contains(worker_id)
550
7
    }
551
552
    /// A unit test function used to send the keep alive message to the worker from the server.
553
0
    pub async fn send_keep_alive_to_worker_for_test(
554
0
        &self,
555
0
        worker_id: &WorkerId,
556
1
    ) -> Result<(), Error> {
557
1
        let mut inner = self.inner.lock().await;
558
1
        let worker = inner.workers.get_mut(worker_id).ok_or_else(|| 
{0
559
0
            make_input_err!("WorkerId '{}' does not exist in workers map", worker_id)
560
0
        })?;
561
1
        worker.keep_alive()
562
1
    }
563
}
564
565
#[async_trait]
566
impl WorkerScheduler for ApiWorkerScheduler {
567
1
    fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
568
1
        self.platform_property_manager.as_ref()
569
1
    }
570
571
70
    async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
572
35
        let mut inner = self.inner.lock().await;
573
35
        let worker_id = worker.id.clone();
574
35
        let result = inner
575
35
            .add_worker(worker)
576
35
            .err_tip(|| "Error while adding worker, removing from pool");
577
35
        if let Err(
err0
) = result {
  Branch (577:16): [True: 0, False: 35]
  Branch (577:16): [Folded - Ignored]
578
0
            return Result::<(), _>::Err(err.clone())
579
0
                .merge(inner.immediate_evict_worker(&worker_id, err, false).await);
580
35
        }
581
35
        Ok(())
582
70
    }
583
584
    async fn update_action(
585
        &self,
586
        worker_id: &WorkerId,
587
        operation_id: &OperationId,
588
        update: UpdateOperationType,
589
20
    ) -> Result<(), Error> {
590
10
        let mut inner = self.inner.lock().await;
591
10
        inner.update_action(worker_id, operation_id, update).await
592
20
    }
593
594
    async fn notify_complete(
595
        &self,
596
        worker_id: &WorkerId,
597
        operation_id: &OperationId,
598
0
    ) -> Result<(), Error> {
599
0
        let mut inner = self.inner.lock().await;
600
0
        inner.notify_complete(worker_id, operation_id).await
601
0
    }
602
603
    async fn worker_keep_alive_received(
604
        &self,
605
        worker_id: &WorkerId,
606
        timestamp: WorkerTimestamp,
607
4
    ) -> Result<(), Error> {
608
2
        let mut inner = self.inner.lock().await;
609
2
        inner
610
2
            .refresh_lifetime(worker_id, timestamp)
611
2
            .err_tip(|| "Error refreshing lifetime in worker_keep_alive_received()")
612
4
    }
613
614
12
    async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
615
6
        let mut inner = self.inner.lock().await;
616
6
        inner
617
6
            .immediate_evict_worker(
618
6
                worker_id,
619
6
                make_err!(Code::Internal, "Received request to remove worker"),
620
6
                false,
621
6
            )
622
6
            .await
623
12
    }
624
625
0
    async fn shutdown(&self, shutdown_guard: ShutdownGuard) {
626
0
        let mut inner = self.inner.lock().await;
627
0
        while let Some(worker_id) = inner
  Branch (627:19): [True: 0, False: 0]
  Branch (627:19): [Folded - Ignored]
628
0
            .workers
629
0
            .peek_lru()
630
0
            .map(|(worker_id, _worker)| worker_id.clone())
631
        {
632
0
            if let Err(err) = inner
  Branch (632:20): [True: 0, False: 0]
  Branch (632:20): [Folded - Ignored]
633
0
                .immediate_evict_worker(
634
0
                    &worker_id,
635
0
                    make_err!(Code::Internal, "Scheduler shutdown"),
636
0
                    true,
637
0
                )
638
0
                .await
639
            {
640
0
                error!(?err, "Error evicting worker on shutdown.");
641
0
            }
642
        }
643
0
        drop(shutdown_guard);
644
0
    }
645
646
10
    async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
647
5
        let mut inner = self.inner.lock().await;
648
649
5
        let mut result = Ok(());
650
        // Items should be sorted based on last_update_timestamp, so we don't need to iterate the entire
651
        // map most of the time.
652
5
        let worker_ids_to_remove: Vec<WorkerId> = inner
653
5
            .workers
654
5
            .iter()
655
5
            .rev()
656
6
            .
map_while5
(|(worker_id, worker)| {
657
6
                if worker.last_update_timestamp <= now_timestamp - self.worker_timeout_s {
  Branch (657:20): [True: 2, False: 4]
  Branch (657:20): [Folded - Ignored]
658
2
                    Some(worker_id.clone())
659
                } else {
660
4
                    None
661
                }
662
6
            })
663
5
            .collect();
664
7
        for 
worker_id2
in &worker_ids_to_remove {
665
2
            warn!(?worker_id, "Worker timed out, removing from pool");
666
2
            result = result.merge(
667
2
                inner
668
2
                    .immediate_evict_worker(
669
2
                        worker_id,
670
2
                        make_err!(
671
2
                            Code::Internal,
672
2
                            "Worker {worker_id} timed out, removing from pool"
673
2
                        ),
674
2
                        false,
675
2
                    )
676
2
                    .await,
677
            );
678
        }
679
680
5
        result
681
10
    }
682
683
4
    async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
684
2
        let mut inner = self.inner.lock().await;
685
2
        inner.set_drain_worker(worker_id, is_draining).await
686
4
    }
687
}
688
689
impl RootMetricsComponent for ApiWorkerScheduler {}