Coverage Report

Created: 2025-12-17 22:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/api_worker_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::ops::{Deref, DerefMut};
16
use core::sync::atomic::{AtomicU64, Ordering};
17
use core::time::Duration;
18
use std::sync::Arc;
19
use std::time::{Instant, UNIX_EPOCH};
20
21
use async_lock::Mutex;
22
use lru::LruCache;
23
use nativelink_config::schedulers::WorkerAllocationStrategy;
24
use nativelink_error::{Code, Error, ResultExt, error_if, make_err, make_input_err};
25
use nativelink_metric::{
26
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
27
    RootMetricsComponent, group,
28
};
29
use nativelink_util::action_messages::{OperationId, WorkerId};
30
use nativelink_util::operation_state_manager::{UpdateOperationType, WorkerStateManager};
31
use nativelink_util::platform_properties::PlatformProperties;
32
use nativelink_util::shutdown_guard::ShutdownGuard;
33
use tokio::sync::Notify;
34
use tonic::async_trait;
35
use tracing::{error, info, trace, warn};
36
37
/// Metrics for tracking scheduler performance.
38
#[derive(Debug, Default)]
39
pub struct SchedulerMetrics {
40
    /// Total number of worker additions.
41
    pub workers_added: AtomicU64,
42
    /// Total number of worker removals.
43
    pub workers_removed: AtomicU64,
44
    /// Total number of `find_worker_for_action` calls.
45
    pub find_worker_calls: AtomicU64,
46
    /// Total number of successful worker matches.
47
    pub find_worker_hits: AtomicU64,
48
    /// Total number of failed worker matches (no worker found).
49
    pub find_worker_misses: AtomicU64,
50
    /// Total time spent in `find_worker_for_action` (nanoseconds).
51
    pub find_worker_time_ns: AtomicU64,
52
    /// Total number of workers iterated during find operations.
53
    pub workers_iterated: AtomicU64,
54
    /// Total number of action dispatches.
55
    pub actions_dispatched: AtomicU64,
56
    /// Total number of keep-alive updates.
57
    pub keep_alive_updates: AtomicU64,
58
    /// Total number of worker timeouts.
59
    pub worker_timeouts: AtomicU64,
60
}
61
62
use crate::platform_property_manager::PlatformPropertyManager;
63
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp, WorkerUpdate};
64
use crate::worker_capability_index::WorkerCapabilityIndex;
65
use crate::worker_registry::SharedWorkerRegistry;
66
use crate::worker_scheduler::WorkerScheduler;
67
68
#[derive(Debug)]
69
struct Workers(LruCache<WorkerId, Worker>);
70
71
impl Deref for Workers {
72
    type Target = LruCache<WorkerId, Worker>;
73
74
98
    fn deref(&self) -> &Self::Target {
75
98
        &self.0
76
98
    }
77
}
78
79
impl DerefMut for Workers {
80
125
    fn deref_mut(&mut self) -> &mut Self::Target {
81
125
        &mut self.0
82
125
    }
83
}
84
85
// Note: This could not be a derive macro because this derive-macro
86
// does not support LruCache and nameless field structs.
87
impl MetricsComponent for Workers {
88
0
    fn publish(
89
0
        &self,
90
0
        _kind: MetricKind,
91
0
        _field_metadata: MetricFieldData,
92
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
93
0
        let _enter = group!("workers").entered();
94
0
        for (worker_id, worker) in self.iter() {
95
0
            let _enter = group!(worker_id).entered();
96
0
            worker.publish(MetricKind::Component, MetricFieldData::default())?;
97
        }
98
0
        Ok(MetricPublishKnownKindData::Component)
99
0
    }
100
}
101
102
/// A collection of workers that are available to run tasks.
103
#[derive(MetricsComponent)]
104
struct ApiWorkerSchedulerImpl {
105
    /// A `LruCache` of workers available based on `allocation_strategy`.
106
    #[metric(group = "workers")]
107
    workers: Workers,
108
109
    /// The worker state manager.
110
    #[metric(group = "worker_state_manager")]
111
    worker_state_manager: Arc<dyn WorkerStateManager>,
112
    /// The allocation strategy for workers.
113
    allocation_strategy: WorkerAllocationStrategy,
114
    /// A channel to notify the matching engine that the worker pool has changed.
115
    worker_change_notify: Arc<Notify>,
116
    /// Worker registry for tracking worker liveness.
117
    worker_registry: SharedWorkerRegistry,
118
119
    /// Whether the worker scheduler is shutting down.
120
    shutting_down: bool,
121
122
    /// Index for fast worker capability lookup.
123
    /// Used to accelerate `find_worker_for_action` by filtering candidates
124
    /// based on properties before doing linear scan.
125
    capability_index: WorkerCapabilityIndex,
126
}
127
128
impl core::fmt::Debug for ApiWorkerSchedulerImpl {
129
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
130
0
        f.debug_struct("ApiWorkerSchedulerImpl")
131
0
            .field("workers", &self.workers)
132
0
            .field("allocation_strategy", &self.allocation_strategy)
133
0
            .field("worker_change_notify", &self.worker_change_notify)
134
0
            .field(
135
0
                "capability_index_size",
136
0
                &self.capability_index.worker_count(),
137
0
            )
138
0
            .field("worker_registry", &self.worker_registry)
139
0
            .finish_non_exhaustive()
140
0
    }
141
}
142
143
impl ApiWorkerSchedulerImpl {
144
    /// Refreshes the lifetime of the worker with the given timestamp.
145
    ///
146
    /// Instead of sending N keepalive messages (one per operation),
147
    /// we now send a single worker heartbeat. The worker registry tracks worker liveness,
148
    /// and timeout detection checks the worker's `last_seen` instead of per-operation timestamps.
149
    ///
150
    /// Note: This only updates the local worker state. The worker registry is updated
151
    /// separately after releasing the inner lock to reduce contention.
152
2
    fn refresh_lifetime(
153
2
        &mut self,
154
2
        worker_id: &WorkerId,
155
2
        timestamp: WorkerTimestamp,
156
2
    ) -> Result<(), Error> {
157
2
        let worker = self.workers.0.peek_mut(worker_id).ok_or_else(|| 
{0
158
0
            make_input_err!(
159
                "Worker not found in worker map in refresh_lifetime() {}",
160
                worker_id
161
            )
162
0
        })?;
163
0
        error_if!(
164
2
            worker.last_update_timestamp > timestamp,
  Branch (164:13): [True: 0, False: 2]
  Branch (164:13): [Folded - Ignored]
165
            "Worker already had a timestamp of {}, but tried to update it with {}",
166
            worker.last_update_timestamp,
167
            timestamp
168
        );
169
2
        worker.last_update_timestamp = timestamp;
170
171
2
        trace!(
172
            ?worker_id,
173
2
            running_operations = worker.running_action_infos.len(),
174
2
            "Worker keepalive received"
175
        );
176
177
2
        Ok(())
178
2
    }
179
180
    /// Adds a worker to the pool.
181
    /// Note: This function will not do any task matching.
182
35
    fn add_worker(&mut self, worker: Worker) -> Result<(), Error> {
183
35
        let worker_id = worker.id.clone();
184
35
        let platform_properties = worker.platform_properties.clone();
185
35
        self.workers.put(worker_id.clone(), worker);
186
187
        // Add to capability index for fast matching
188
35
        self.capability_index
189
35
            .add_worker(&worker_id, &platform_properties);
190
191
        // Worker is not cloneable, and we do not want to send the initial connection results until
192
        // we have added it to the map, or we might get some strange race conditions due to the way
193
        // the multi-threaded runtime works.
194
35
        let worker = self.workers.peek_mut(&worker_id).unwrap();
195
35
        let res = worker
196
35
            .send_initial_connection_result()
197
35
            .err_tip(|| "Failed to send initial connection result to worker");
198
35
        if let Err(
err0
) = &res {
  Branch (198:16): [True: 0, False: 35]
  Branch (198:16): [Folded - Ignored]
199
0
            error!(
200
                ?worker_id,
201
                ?err,
202
0
                "Worker connection appears to have been closed while adding to pool"
203
            );
204
35
        }
205
35
        self.worker_change_notify.notify_one();
206
35
        res
207
35
    }
208
209
    /// Removes worker from pool.
210
    /// Note: The caller is responsible for any rescheduling of any tasks that might be
211
    /// running.
212
10
    fn remove_worker(&mut self, worker_id: &WorkerId) -> Option<Worker> {
213
        // Remove from capability index
214
10
        self.capability_index.remove_worker(worker_id);
215
216
10
        let result = self.workers.pop(worker_id);
217
10
        self.worker_change_notify.notify_one();
218
10
        result
219
10
    }
220
221
    /// Sets if the worker is draining or not.
222
2
    async fn set_drain_worker(
223
2
        &mut self,
224
2
        worker_id: &WorkerId,
225
2
        is_draining: bool,
226
2
    ) -> Result<(), Error> {
227
2
        let worker = self
228
2
            .workers
229
2
            .get_mut(worker_id)
230
2
            .err_tip(|| format!(
"Worker {worker_id} doesn't exist in the pool"0
))
?0
;
231
2
        worker.is_draining = is_draining;
232
2
        self.worker_change_notify.notify_one();
233
2
        Ok(())
234
2
    }
235
236
50
    fn inner_find_worker_for_action(
237
50
        &self,
238
50
        platform_properties: &PlatformProperties,
239
50
        full_worker_logging: bool,
240
50
    ) -> Option<WorkerId> {
241
        // Use capability index to get candidate workers that match STATIC properties
242
        // (Exact, Unknown) and have the required property keys (Priority, Minimum).
243
        // This reduces complexity from O(W × P) to O(P × log(W)) for exact properties.
244
50
        let candidates = self
245
50
            .capability_index
246
50
            .find_matching_workers(platform_properties);
247
248
50
        if candidates.is_empty() {
  Branch (248:12): [True: 14, False: 36]
  Branch (248:12): [Folded - Ignored]
249
14
            if full_worker_logging {
  Branch (249:16): [True: 1, False: 13]
  Branch (249:16): [Folded - Ignored]
250
1
                info!("No workers in capability index match required properties");
251
13
            }
252
14
            return None;
253
36
        }
254
255
        // Check function for availability AND dynamic Minimum property verification.
256
        // The index only does presence checks for Minimum properties since their
257
        // values change dynamically as jobs are assigned to workers.
258
36
        let worker_matches = |(worker_id, w): &(&WorkerId, &Worker)| -> bool {
259
36
            if !w.can_accept_work() {
  Branch (259:16): [True: 1, False: 35]
  Branch (259:16): [Folded - Ignored]
260
1
                if full_worker_logging {
  Branch (260:20): [True: 0, False: 1]
  Branch (260:20): [Folded - Ignored]
261
0
                    info!(
262
0
                        "Worker {worker_id} cannot accept work: is_paused={}, is_draining={}",
263
                        w.is_paused, w.is_draining
264
                    );
265
1
                }
266
1
                return false;
267
35
            }
268
269
            // Verify Minimum properties at runtime (their values are dynamic)
270
35
            if !platform_properties.is_satisfied_by(&w.platform_properties, full_worker_logging) {
  Branch (270:16): [True: 3, False: 32]
  Branch (270:16): [Folded - Ignored]
271
3
                return false;
272
32
            }
273
274
32
            true
275
36
        };
276
277
        // Now check constraints on filtered candidates.
278
        // Iterate in LRU order based on allocation strategy.
279
36
        let workers_iter = self.workers.iter();
280
281
36
        match self.allocation_strategy {
282
            // Use rfind to get the least recently used that satisfies the properties.
283
36
            WorkerAllocationStrategy::LeastRecentlyUsed => workers_iter
284
36
                .rev()
285
37
                .
filter36
(|(worker_id, _)| candidates.contains(worker_id))
286
36
                .find(&worker_matches)
287
36
                .map(|(_, w)| 
w.id32
.
clone32
()),
288
289
            // Use find to get the most recently used that satisfies the properties.
290
0
            WorkerAllocationStrategy::MostRecentlyUsed => workers_iter
291
0
                .filter(|(worker_id, _)| candidates.contains(worker_id))
292
0
                .find(&worker_matches)
293
0
                .map(|(_, w)| w.id.clone()),
294
        }
295
50
    }
296
297
10
    async fn update_action(
298
10
        &mut self,
299
10
        worker_id: &WorkerId,
300
10
        operation_id: &OperationId,
301
10
        update: UpdateOperationType,
302
10
    ) -> Result<(), Error> {
303
10
        let worker = self.workers.get_mut(worker_id).err_tip(|| 
{0
304
0
            format!("Worker {worker_id} does not exist in SimpleScheduler::update_action")
305
0
        })?;
306
307
        // Ensure the worker is supposed to be running the operation.
308
10
        if !worker.running_action_infos.contains_key(operation_id) {
  Branch (308:12): [True: 1, False: 9]
  Branch (308:12): [Folded - Ignored]
309
1
            let err = make_err!(
310
1
                Code::Internal,
311
                "Operation {operation_id} should not be running on worker {worker_id} in SimpleScheduler::update_action"
312
            );
313
1
            return Result::<(), _>::Err(err.clone())
314
1
                .merge(self.immediate_evict_worker(worker_id, err, false).await);
315
9
        }
316
317
9
        let (is_finished, due_to_backpressure) = match &update {
318
6
            UpdateOperationType::UpdateWithActionStage(action_stage) => {
319
6
                (action_stage.is_finished(), false)
320
            }
321
0
            UpdateOperationType::KeepAlive => (false, false),
322
3
            UpdateOperationType::UpdateWithError(err) => {
323
3
                (true, err.code == Code::ResourceExhausted)
324
            }
325
0
            UpdateOperationType::UpdateWithDisconnect => (true, false),
326
            UpdateOperationType::ExecutionComplete => {
327
                // No update here, just restoring platform properties.
328
0
                worker.execution_complete(operation_id);
329
0
                self.worker_change_notify.notify_one();
330
0
                return Ok(());
331
            }
332
        };
333
334
        // Update the operation in the worker state manager.
335
        {
336
9
            let 
update_operation_res8
= self
337
9
                .worker_state_manager
338
9
                .update_operation(operation_id, worker_id, update)
339
9
                .await
340
8
                .err_tip(|| "in update_operation on SimpleScheduler::update_action");
341
8
            if let Err(
err0
) = update_operation_res {
  Branch (341:20): [True: 0, False: 8]
  Branch (341:20): [Folded - Ignored]
342
0
                error!(
343
                    %operation_id,
344
                    ?worker_id,
345
                    ?err,
346
0
                    "Failed to update_operation on update_action"
347
                );
348
0
                return Err(err);
349
8
            }
350
        }
351
352
8
        if !is_finished {
  Branch (352:12): [True: 0, False: 8]
  Branch (352:12): [Folded - Ignored]
353
0
            return Ok(());
354
8
        }
355
356
        // Clear this action from the current worker if finished.
357
8
        let complete_action_res = {
358
8
            let was_paused = !worker.can_accept_work();
359
360
            // Note: We need to run this before dealing with backpressure logic.
361
8
            let complete_action_res = worker.complete_action(operation_id).await;
362
363
            // Only pause if there's an action still waiting that will unpause.
364
8
            if (was_paused || due_to_backpressure) && 
worker0
.
has_actions0
() {
  Branch (364:17): [True: 0, False: 8]
  Branch (364:31): [True: 0, False: 8]
  Branch (364:55): [True: 0, False: 0]
  Branch (364:17): [Folded - Ignored]
  Branch (364:31): [Folded - Ignored]
  Branch (364:55): [Folded - Ignored]
365
0
                worker.is_paused = true;
366
8
            }
367
8
            complete_action_res
368
        };
369
370
8
        self.worker_change_notify.notify_one();
371
372
8
        complete_action_res
373
9
    }
374
375
    /// Notifies the specified worker to run the given action and handles errors by evicting
376
    /// the worker if the notification fails.
377
32
    async fn worker_notify_run_action(
378
32
        &mut self,
379
32
        worker_id: WorkerId,
380
32
        operation_id: OperationId,
381
32
        action_info: ActionInfoWithProps,
382
32
    ) -> Result<(), Error> {
383
32
        if let Some(worker) = self.workers.get_mut(&worker_id) {
  Branch (383:16): [True: 0, False: 0]
  Branch (383:16): [Folded - Ignored]
  Branch (383:16): [True: 4, False: 0]
  Branch (383:16): [True: 27, False: 0]
  Branch (383:16): [True: 1, False: 0]
384
32
            let notify_worker_result = worker
385
32
                .notify_update(WorkerUpdate::RunAction((operation_id, action_info.clone())))
386
32
                .await;
387
388
32
            if let Err(
notify_worker_result1
) = notify_worker_result {
  Branch (388:20): [True: 0, False: 0]
  Branch (388:20): [Folded - Ignored]
  Branch (388:20): [True: 0, False: 4]
  Branch (388:20): [True: 1, False: 26]
  Branch (388:20): [True: 0, False: 1]
389
1
                warn!(
390
                    ?worker_id,
391
                    ?action_info,
392
                    ?notify_worker_result,
393
1
                    "Worker command failed, removing worker",
394
                );
395
396
                // A slightly nasty way of figuring out that the worker disconnected
397
                // from send_msg_to_worker without introducing complexity to the
398
                // code path from here to there.
399
1
                let is_disconnect = notify_worker_result.code == Code::Internal
  Branch (399:37): [True: 0, False: 0]
  Branch (399:37): [Folded - Ignored]
  Branch (399:37): [True: 0, False: 0]
  Branch (399:37): [True: 1, False: 0]
  Branch (399:37): [True: 0, False: 0]
400
1
                    && notify_worker_result.messages.len() == 1
  Branch (400:24): [True: 0, False: 0]
  Branch (400:24): [Folded - Ignored]
  Branch (400:24): [True: 0, False: 0]
  Branch (400:24): [True: 1, False: 0]
  Branch (400:24): [True: 0, False: 0]
401
1
                    && notify_worker_result.messages[0] == "Worker Disconnected";
402
403
1
                let err = make_err!(
404
1
                    Code::Internal,
405
                    "Worker command failed, removing worker {worker_id} -- {notify_worker_result:?}",
406
                );
407
408
1
                return Result::<(), _>::Err(err.clone()).merge(
409
1
                    self.immediate_evict_worker(&worker_id, err, is_disconnect)
410
1
                        .await,
411
                );
412
31
            }
413
31
            Ok(())
414
        } else {
415
0
            warn!(
416
                ?worker_id,
417
                %operation_id,
418
                ?action_info,
419
0
                "Worker not found in worker map in worker_notify_run_action"
420
            );
421
            // Ensure the operation is put back to queued state.
422
0
            self.worker_state_manager
423
0
                .update_operation(
424
0
                    &operation_id,
425
0
                    &worker_id,
426
0
                    UpdateOperationType::UpdateWithDisconnect,
427
0
                )
428
0
                .await
429
        }
430
32
    }
431
432
    /// Evicts the worker from the pool and puts items back into the queue if anything was being executed on it.
433
10
    async fn immediate_evict_worker(
434
10
        &mut self,
435
10
        worker_id: &WorkerId,
436
10
        err: Error,
437
10
        is_disconnect: bool,
438
10
    ) -> Result<(), Error> {
439
10
        let mut result = Ok(());
440
10
        if let Some(mut worker) = self.remove_worker(worker_id) {
  Branch (440:16): [True: 9, False: 0]
  Branch (440:16): [Folded - Ignored]
  Branch (440:16): [True: 0, False: 0]
  Branch (440:16): [True: 1, False: 0]
  Branch (440:16): [True: 0, False: 0]
441
            // We don't care if we fail to send message to worker, this is only a best attempt.
442
10
            drop(worker.notify_update(WorkerUpdate::Disconnect).await);
443
10
            let update = if is_disconnect {
  Branch (443:29): [True: 0, False: 9]
  Branch (443:29): [Folded - Ignored]
  Branch (443:29): [True: 0, False: 0]
  Branch (443:29): [True: 0, False: 1]
  Branch (443:29): [True: 0, False: 0]
444
0
                UpdateOperationType::UpdateWithDisconnect
445
            } else {
446
10
                UpdateOperationType::UpdateWithError(err)
447
            };
448
10
            for (
operation_id8
, _) in worker.running_action_infos.drain() {
449
8
                result = result.merge(
450
8
                    self.worker_state_manager
451
8
                        .update_operation(&operation_id, worker_id, update.clone())
452
8
                        .await,
453
                );
454
            }
455
0
        }
456
        // Note: Calling this many time is very cheap, it'll only trigger `do_try_match` once.
457
        // TODO(palfrey) This should be moved to inside the Workers struct.
458
10
        self.worker_change_notify.notify_one();
459
10
        result
460
10
    }
461
}
462
463
#[derive(Debug, MetricsComponent)]
464
pub struct ApiWorkerScheduler {
465
    #[metric]
466
    inner: Mutex<ApiWorkerSchedulerImpl>,
467
    #[metric(group = "platform_property_manager")]
468
    platform_property_manager: Arc<PlatformPropertyManager>,
469
470
    #[metric(
471
        help = "Timeout of how long to evict workers if no response in this given amount of time in seconds."
472
    )]
473
    worker_timeout_s: u64,
474
    /// Shared worker registry for checking worker liveness.
475
    worker_registry: SharedWorkerRegistry,
476
477
    /// Performance metrics for observability.
478
    metrics: Arc<SchedulerMetrics>,
479
}
480
481
impl ApiWorkerScheduler {
482
30
    pub fn new(
483
30
        worker_state_manager: Arc<dyn WorkerStateManager>,
484
30
        platform_property_manager: Arc<PlatformPropertyManager>,
485
30
        allocation_strategy: WorkerAllocationStrategy,
486
30
        worker_change_notify: Arc<Notify>,
487
30
        worker_timeout_s: u64,
488
30
        worker_registry: SharedWorkerRegistry,
489
30
    ) -> Arc<Self> {
490
30
        Arc::new(Self {
491
30
            inner: Mutex::new(ApiWorkerSchedulerImpl {
492
30
                workers: Workers(LruCache::unbounded()),
493
30
                worker_state_manager: worker_state_manager.clone(),
494
30
                allocation_strategy,
495
30
                worker_change_notify,
496
30
                worker_registry: worker_registry.clone(),
497
30
                shutting_down: false,
498
30
                capability_index: WorkerCapabilityIndex::new(),
499
30
            }),
500
30
            platform_property_manager,
501
30
            worker_timeout_s,
502
30
            worker_registry,
503
30
            metrics: Arc::new(SchedulerMetrics::default()),
504
30
        })
505
30
    }
506
507
    /// Returns a reference to the worker registry.
508
0
    pub const fn worker_registry(&self) -> &SharedWorkerRegistry {
509
0
        &self.worker_registry
510
0
    }
511
512
32
    pub async fn worker_notify_run_action(
513
32
        &self,
514
32
        worker_id: WorkerId,
515
32
        operation_id: OperationId,
516
32
        action_info: ActionInfoWithProps,
517
32
    ) -> Result<(), Error> {
518
32
        self.metrics
519
32
            .actions_dispatched
520
32
            .fetch_add(1, Ordering::Relaxed);
521
32
        let mut inner = self.inner.lock().await;
522
32
        inner
523
32
            .worker_notify_run_action(worker_id, operation_id, action_info)
524
32
            .await
525
32
    }
526
527
    /// Returns the scheduler metrics for observability.
528
    #[must_use]
529
0
    pub const fn get_metrics(&self) -> &Arc<SchedulerMetrics> {
530
0
        &self.metrics
531
0
    }
532
533
    /// Attempts to find a worker that is capable of running this action.
534
    // TODO(palfrey) This algorithm is not very efficient. Simple testing using a tree-like
535
    // structure showed worse performance on a 10_000 worker * 7 properties * 1000 queued tasks
536
    // simulation of worst cases in a single threaded environment.
537
50
    pub async fn find_worker_for_action(
538
50
        &self,
539
50
        platform_properties: &PlatformProperties,
540
50
        full_worker_logging: bool,
541
50
    ) -> Option<WorkerId> {
542
50
        let start = Instant::now();
543
50
        self.metrics
544
50
            .find_worker_calls
545
50
            .fetch_add(1, Ordering::Relaxed);
546
547
50
        let inner = self.inner.lock().await;
548
50
        let worker_count = inner.workers.len() as u64;
549
50
        let result = inner.inner_find_worker_for_action(platform_properties, full_worker_logging);
550
551
        // Track workers iterated (worst case is all workers)
552
50
        self.metrics
553
50
            .workers_iterated
554
50
            .fetch_add(worker_count, Ordering::Relaxed);
555
556
50
        if result.is_some() {
  Branch (556:12): [True: 0, False: 0]
  Branch (556:12): [Folded - Ignored]
  Branch (556:12): [True: 4, False: 1]
  Branch (556:12): [True: 28, False: 17]
557
32
            self.metrics
558
32
                .find_worker_hits
559
32
                .fetch_add(1, Ordering::Relaxed);
560
32
        } else {
561
18
            self.metrics
562
18
                .find_worker_misses
563
18
                .fetch_add(1, Ordering::Relaxed);
564
18
        }
565
566
        #[allow(clippy::cast_possible_truncation)]
567
50
        self.metrics
568
50
            .find_worker_time_ns
569
50
            .fetch_add(start.elapsed().as_nanos() as u64, Ordering::Relaxed);
570
50
        result
571
50
    }
572
573
    /// Checks to see if the worker exists in the worker pool. Should only be used in unit tests.
574
    #[must_use]
575
7
    
pub async fn contains_worker_for_test(&self, worker_id: &WorkerId) -> bool0
{
576
7
        let inner = self.inner.lock().await;
577
7
        inner.workers.contains(worker_id)
578
7
    }
579
580
    /// A unit test function used to send the keep alive message to the worker from the server.
581
0
    pub async fn send_keep_alive_to_worker_for_test(
582
0
        &self,
583
0
        worker_id: &WorkerId,
584
1
    ) -> Result<(), Error> {
585
1
        let mut inner = self.inner.lock().await;
586
1
        let worker = inner.workers.get_mut(worker_id).ok_or_else(|| 
{0
587
0
            make_input_err!("WorkerId '{}' does not exist in workers map", worker_id)
588
0
        })?;
589
1
        worker.keep_alive()
590
1
    }
591
}
592
593
#[async_trait]
594
impl WorkerScheduler for ApiWorkerScheduler {
595
1
    fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
596
1
        self.platform_property_manager.as_ref()
597
1
    }
598
599
35
    async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
600
        let worker_id = worker.id.clone();
601
        let worker_timestamp = worker.last_update_timestamp;
602
        let mut inner = self.inner.lock().await;
603
        if inner.shutting_down {
604
            warn!("Rejected worker add during shutdown: {}", worker_id);
605
            return Err(make_err!(
606
                Code::Unavailable,
607
                "Received request to add worker while shutting down"
608
            ));
609
        }
610
        let result = inner
611
            .add_worker(worker)
612
            .err_tip(|| "Error while adding worker, removing from pool");
613
        if let Err(err) = result {
614
            return Result::<(), _>::Err(err.clone())
615
                .merge(inner.immediate_evict_worker(&worker_id, err, false).await);
616
        }
617
618
        let now = UNIX_EPOCH + Duration::from_secs(worker_timestamp);
619
        self.worker_registry.register_worker(&worker_id, now).await;
620
621
        self.metrics.workers_added.fetch_add(1, Ordering::Relaxed);
622
        Ok(())
623
35
    }
624
625
    async fn update_action(
626
        &self,
627
        worker_id: &WorkerId,
628
        operation_id: &OperationId,
629
        update: UpdateOperationType,
630
10
    ) -> Result<(), Error> {
631
        let mut inner = self.inner.lock().await;
632
        inner.update_action(worker_id, operation_id, update).await
633
10
    }
634
635
    async fn worker_keep_alive_received(
636
        &self,
637
        worker_id: &WorkerId,
638
        timestamp: WorkerTimestamp,
639
2
    ) -> Result<(), Error> {
640
        {
641
            let mut inner = self.inner.lock().await;
642
            inner
643
                .refresh_lifetime(worker_id, timestamp)
644
                .err_tip(|| "Error refreshing lifetime in worker_keep_alive_received()")?;
645
        }
646
        let now = UNIX_EPOCH + Duration::from_secs(timestamp);
647
        self.worker_registry
648
            .update_worker_heartbeat(worker_id, now)
649
            .await;
650
        Ok(())
651
2
    }
652
653
6
    async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
654
        self.worker_registry.remove_worker(worker_id).await;
655
656
        let mut inner = self.inner.lock().await;
657
        inner
658
            .immediate_evict_worker(
659
                worker_id,
660
                make_err!(Code::Internal, "Received request to remove worker"),
661
                false,
662
            )
663
            .await
664
6
    }
665
666
0
    async fn shutdown(&self, shutdown_guard: ShutdownGuard) {
667
        let mut inner = self.inner.lock().await;
668
        inner.shutting_down = true; // should reject further worker registration
669
        while let Some(worker_id) = inner
670
            .workers
671
            .peek_lru()
672
0
            .map(|(worker_id, _worker)| worker_id.clone())
673
        {
674
            if let Err(err) = inner
675
                .immediate_evict_worker(
676
                    &worker_id,
677
                    make_err!(Code::Internal, "Scheduler shutdown"),
678
                    true,
679
                )
680
                .await
681
            {
682
                error!(?err, "Error evicting worker on shutdown.");
683
            }
684
        }
685
        drop(shutdown_guard);
686
0
    }
687
688
5
    async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
689
        // Check worker liveness using both the local timestamp (from LRU)
690
        // and the worker registry. A worker is alive if either source says it's alive.
691
        let timeout = Duration::from_secs(self.worker_timeout_s);
692
        let now = UNIX_EPOCH + Duration::from_secs(now_timestamp);
693
        let timeout_threshold = now_timestamp.saturating_sub(self.worker_timeout_s);
694
695
        let workers_to_check: Vec<(WorkerId, bool)> = {
696
            let inner = self.inner.lock().await;
697
            inner
698
                .workers
699
                .iter()
700
6
                .map(|(worker_id, worker)| {
701
6
                    let local_alive = worker.last_update_timestamp > timeout_threshold;
702
6
                    (worker_id.clone(), local_alive)
703
6
                })
704
                .collect()
705
        };
706
707
        let mut worker_ids_to_remove = Vec::new();
708
        for (worker_id, local_alive) in workers_to_check {
709
            if local_alive {
710
                continue;
711
            }
712
713
            let registry_alive = self
714
                .worker_registry
715
                .is_worker_alive(&worker_id, timeout, now)
716
                .await;
717
718
            if !registry_alive {
719
                trace!(
720
                    ?worker_id,
721
                    local_alive,
722
                    registry_alive,
723
                    timeout_threshold,
724
                    "Worker timed out - neither local nor registry shows alive"
725
                );
726
                worker_ids_to_remove.push(worker_id);
727
            }
728
        }
729
730
        if worker_ids_to_remove.is_empty() {
731
            return Ok(());
732
        }
733
734
        let mut inner = self.inner.lock().await;
735
        let mut result = Ok(());
736
737
        for worker_id in &worker_ids_to_remove {
738
            warn!(?worker_id, "Worker timed out, removing from pool");
739
            result = result.merge(
740
                inner
741
                    .immediate_evict_worker(
742
                        worker_id,
743
                        make_err!(
744
                            Code::Internal,
745
                            "Worker {worker_id} timed out, removing from pool"
746
                        ),
747
                        false,
748
                    )
749
                    .await,
750
            );
751
        }
752
753
        result
754
5
    }
755
756
2
    async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
757
        let mut inner = self.inner.lock().await;
758
        inner.set_drain_worker(worker_id, is_draining).await
759
2
    }
760
}
761
762
impl RootMetricsComponent for ApiWorkerScheduler {}