Coverage Report

Created: 2026-02-02 12:27

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/api_worker_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::ops::{Deref, DerefMut};
16
use core::sync::atomic::{AtomicU64, Ordering};
17
use core::time::Duration;
18
use std::sync::Arc;
19
use std::time::{Instant, UNIX_EPOCH};
20
21
use async_lock::Mutex;
22
use lru::LruCache;
23
use nativelink_config::schedulers::WorkerAllocationStrategy;
24
use nativelink_error::{Code, Error, ResultExt, error_if, make_err, make_input_err};
25
use nativelink_metric::{
26
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
27
    RootMetricsComponent, group,
28
};
29
use nativelink_util::action_messages::{OperationId, WorkerId};
30
use nativelink_util::operation_state_manager::{UpdateOperationType, WorkerStateManager};
31
use nativelink_util::platform_properties::PlatformProperties;
32
use nativelink_util::shutdown_guard::ShutdownGuard;
33
use tokio::sync::Notify;
34
use tonic::async_trait;
35
use tracing::{error, info, trace, warn};
36
37
/// Metrics for tracking scheduler performance.
38
#[derive(Debug, Default)]
39
pub struct SchedulerMetrics {
40
    /// Total number of worker additions.
41
    pub workers_added: AtomicU64,
42
    /// Total number of worker removals.
43
    pub workers_removed: AtomicU64,
44
    /// Total number of `find_worker_for_action` calls.
45
    pub find_worker_calls: AtomicU64,
46
    /// Total number of successful worker matches.
47
    pub find_worker_hits: AtomicU64,
48
    /// Total number of failed worker matches (no worker found).
49
    pub find_worker_misses: AtomicU64,
50
    /// Total time spent in `find_worker_for_action` (nanoseconds).
51
    pub find_worker_time_ns: AtomicU64,
52
    /// Total number of workers iterated during find operations.
53
    pub workers_iterated: AtomicU64,
54
    /// Total number of action dispatches.
55
    pub actions_dispatched: AtomicU64,
56
    /// Total number of keep-alive updates.
57
    pub keep_alive_updates: AtomicU64,
58
    /// Total number of worker timeouts.
59
    pub worker_timeouts: AtomicU64,
60
}
61
62
use crate::platform_property_manager::PlatformPropertyManager;
63
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp, WorkerUpdate};
64
use crate::worker_capability_index::WorkerCapabilityIndex;
65
use crate::worker_registry::SharedWorkerRegistry;
66
use crate::worker_scheduler::WorkerScheduler;
67
68
#[derive(Debug)]
69
struct Workers(LruCache<WorkerId, Worker>);
70
71
impl Deref for Workers {
72
    type Target = LruCache<WorkerId, Worker>;
73
74
102
    fn deref(&self) -> &Self::Target {
75
102
        &self.0
76
102
    }
77
}
78
79
impl DerefMut for Workers {
80
128
    fn deref_mut(&mut self) -> &mut Self::Target {
81
128
        &mut self.0
82
128
    }
83
}
84
85
// Note: This could not be a derive macro because this derive-macro
86
// does not support LruCache and nameless field structs.
87
impl MetricsComponent for Workers {
88
0
    fn publish(
89
0
        &self,
90
0
        _kind: MetricKind,
91
0
        _field_metadata: MetricFieldData,
92
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
93
0
        let _enter = group!("workers").entered();
94
0
        for (worker_id, worker) in self.iter() {
95
0
            let _enter = group!(worker_id).entered();
96
0
            worker.publish(MetricKind::Component, MetricFieldData::default())?;
97
        }
98
0
        Ok(MetricPublishKnownKindData::Component)
99
0
    }
100
}
101
102
/// A collection of workers that are available to run tasks.
103
#[derive(MetricsComponent)]
104
struct ApiWorkerSchedulerImpl {
105
    /// A `LruCache` of workers available based on `allocation_strategy`.
106
    #[metric(group = "workers")]
107
    workers: Workers,
108
109
    /// The worker state manager.
110
    #[metric(group = "worker_state_manager")]
111
    worker_state_manager: Arc<dyn WorkerStateManager>,
112
    /// The allocation strategy for workers.
113
    allocation_strategy: WorkerAllocationStrategy,
114
    /// A channel to notify the matching engine that the worker pool has changed.
115
    worker_change_notify: Arc<Notify>,
116
    /// Worker registry for tracking worker liveness.
117
    worker_registry: SharedWorkerRegistry,
118
119
    /// Whether the worker scheduler is shutting down.
120
    shutting_down: bool,
121
122
    /// Index for fast worker capability lookup.
123
    /// Used to accelerate `find_worker_for_action` by filtering candidates
124
    /// based on properties before doing linear scan.
125
    capability_index: WorkerCapabilityIndex,
126
}
127
128
impl core::fmt::Debug for ApiWorkerSchedulerImpl {
129
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
130
0
        f.debug_struct("ApiWorkerSchedulerImpl")
131
0
            .field("workers", &self.workers)
132
0
            .field("allocation_strategy", &self.allocation_strategy)
133
0
            .field("worker_change_notify", &self.worker_change_notify)
134
0
            .field(
135
0
                "capability_index_size",
136
0
                &self.capability_index.worker_count(),
137
0
            )
138
0
            .field("worker_registry", &self.worker_registry)
139
0
            .finish_non_exhaustive()
140
0
    }
141
}
142
143
impl ApiWorkerSchedulerImpl {
144
    /// Refreshes the lifetime of the worker with the given timestamp.
145
    ///
146
    /// Instead of sending N keepalive messages (one per operation),
147
    /// we now send a single worker heartbeat. The worker registry tracks worker liveness,
148
    /// and timeout detection checks the worker's `last_seen` instead of per-operation timestamps.
149
    ///
150
    /// Note: This only updates the local worker state. The worker registry is updated
151
    /// separately after releasing the inner lock to reduce contention.
152
2
    fn refresh_lifetime(
153
2
        &mut self,
154
2
        worker_id: &WorkerId,
155
2
        timestamp: WorkerTimestamp,
156
2
    ) -> Result<(), Error> {
157
2
        let worker = self.workers.0.peek_mut(worker_id).ok_or_else(|| 
{0
158
0
            make_input_err!(
159
                "Worker not found in worker map in refresh_lifetime() {}",
160
                worker_id
161
            )
162
0
        })?;
163
0
        error_if!(
164
2
            worker.last_update_timestamp > timestamp,
  Branch (164:13): [True: 0, False: 2]
  Branch (164:13): [Folded - Ignored]
165
            "Worker already had a timestamp of {}, but tried to update it with {}",
166
            worker.last_update_timestamp,
167
            timestamp
168
        );
169
2
        worker.last_update_timestamp = timestamp;
170
171
2
        trace!(
172
            ?worker_id,
173
2
            running_operations = worker.running_action_infos.len(),
174
2
            "Worker keepalive received"
175
        );
176
177
2
        Ok(())
178
2
    }
179
180
    /// Adds a worker to the pool.
181
    /// Note: This function will not do any task matching.
182
36
    fn add_worker(&mut self, worker: Worker) -> Result<(), Error> {
183
36
        let worker_id = worker.id.clone();
184
36
        let platform_properties = worker.platform_properties.clone();
185
36
        self.workers.put(worker_id.clone(), worker);
186
187
        // Add to capability index for fast matching
188
36
        self.capability_index
189
36
            .add_worker(&worker_id, &platform_properties);
190
191
        // Worker is not cloneable, and we do not want to send the initial connection results until
192
        // we have added it to the map, or we might get some strange race conditions due to the way
193
        // the multi-threaded runtime works.
194
36
        let worker = self.workers.peek_mut(&worker_id).unwrap();
195
36
        let res = worker
196
36
            .send_initial_connection_result()
197
36
            .err_tip(|| "Failed to send initial connection result to worker");
198
36
        if let Err(
err0
) = &res {
  Branch (198:16): [True: 0, False: 36]
  Branch (198:16): [Folded - Ignored]
199
0
            error!(
200
                ?worker_id,
201
                ?err,
202
0
                "Worker connection appears to have been closed while adding to pool"
203
            );
204
36
        }
205
36
        self.worker_change_notify.notify_one();
206
36
        res
207
36
    }
208
209
    /// Removes worker from pool.
210
    /// Note: The caller is responsible for any rescheduling of any tasks that might be
211
    /// running.
212
10
    fn remove_worker(&mut self, worker_id: &WorkerId) -> Option<Worker> {
213
        // Remove from capability index
214
10
        self.capability_index.remove_worker(worker_id);
215
216
10
        let result = self.workers.pop(worker_id);
217
10
        self.worker_change_notify.notify_one();
218
10
        result
219
10
    }
220
221
    /// Sets if the worker is draining or not.
222
2
    async fn set_drain_worker(
223
2
        &mut self,
224
2
        worker_id: &WorkerId,
225
2
        is_draining: bool,
226
2
    ) -> Result<(), Error> {
227
2
        let worker = self
228
2
            .workers
229
2
            .get_mut(worker_id)
230
2
            .err_tip(|| format!(
"Worker {worker_id} doesn't exist in the pool"0
))
?0
;
231
2
        worker.is_draining = is_draining;
232
2
        self.worker_change_notify.notify_one();
233
2
        Ok(())
234
2
    }
235
236
52
    fn inner_find_worker_for_action(
237
52
        &self,
238
52
        platform_properties: &PlatformProperties,
239
52
        full_worker_logging: bool,
240
52
    ) -> Option<WorkerId> {
241
        // Use capability index to get candidate workers that match STATIC properties
242
        // (Exact, Unknown) and have the required property keys (Priority, Minimum).
243
        // This reduces complexity from O(W × P) to O(P × log(W)) for exact properties.
244
52
        let candidates = self
245
52
            .capability_index
246
52
            .find_matching_workers(platform_properties, full_worker_logging);
247
248
52
        if candidates.is_empty() {
  Branch (248:12): [True: 14, False: 38]
  Branch (248:12): [Folded - Ignored]
249
14
            if full_worker_logging {
  Branch (249:16): [True: 1, False: 13]
  Branch (249:16): [Folded - Ignored]
250
1
                info!("No workers in capability index match required properties");
251
13
            }
252
14
            return None;
253
38
        }
254
255
        // Check function for availability AND dynamic Minimum property verification.
256
        // The index only does presence checks for Minimum properties since their
257
        // values change dynamically as jobs are assigned to workers.
258
38
        let worker_matches = |(worker_id, w): &(&WorkerId, &Worker)| -> bool {
259
38
            if !w.can_accept_work() {
  Branch (259:16): [True: 2, False: 36]
  Branch (259:16): [Folded - Ignored]
260
2
                if full_worker_logging {
  Branch (260:20): [True: 1, False: 1]
  Branch (260:20): [Folded - Ignored]
261
1
                    info!(
262
1
                        "Worker {worker_id} cannot accept work: is_paused={}, is_draining={}, inflight={}/{}",
263
                        w.is_paused,
264
                        w.is_draining,
265
1
                        w.running_action_infos.len(),
266
                        w.max_inflight_tasks
267
                    );
268
1
                }
269
2
                return false;
270
36
            }
271
272
            // Verify Minimum properties at runtime (their values are dynamic)
273
36
            if !platform_properties.is_satisfied_by(&w.platform_properties, full_worker_logging) {
  Branch (273:16): [True: 3, False: 33]
  Branch (273:16): [Folded - Ignored]
274
3
                return false;
275
33
            }
276
277
33
            true
278
38
        };
279
280
        // Now check constraints on filtered candidates.
281
        // Iterate in LRU order based on allocation strategy.
282
38
        let workers_iter = self.workers.iter();
283
284
38
        match self.allocation_strategy {
285
            // Use rfind to get the least recently used that satisfies the properties.
286
38
            WorkerAllocationStrategy::LeastRecentlyUsed => workers_iter
287
38
                .rev()
288
39
                .
filter38
(|(worker_id, _)| candidates.contains(worker_id))
289
38
                .find(&worker_matches)
290
38
                .map(|(_, w)| 
w.id33
.
clone33
()),
291
292
            // Use find to get the most recently used that satisfies the properties.
293
0
            WorkerAllocationStrategy::MostRecentlyUsed => workers_iter
294
0
                .filter(|(worker_id, _)| candidates.contains(worker_id))
295
0
                .find(&worker_matches)
296
0
                .map(|(_, w)| w.id.clone()),
297
        }
298
52
    }
299
300
10
    async fn update_action(
301
10
        &mut self,
302
10
        worker_id: &WorkerId,
303
10
        operation_id: &OperationId,
304
10
        update: UpdateOperationType,
305
10
    ) -> Result<(), Error> {
306
10
        let worker = self.workers.get_mut(worker_id).err_tip(|| 
{0
307
0
            format!("Worker {worker_id} does not exist in SimpleScheduler::update_action")
308
0
        })?;
309
310
        // Ensure the worker is supposed to be running the operation.
311
10
        if !worker.running_action_infos.contains_key(operation_id) {
  Branch (311:12): [True: 1, False: 9]
  Branch (311:12): [Folded - Ignored]
312
1
            let err = make_err!(
313
1
                Code::Internal,
314
                "Operation {operation_id} should not be running on worker {worker_id} in SimpleScheduler::update_action"
315
            );
316
1
            return Result::<(), _>::Err(err.clone())
317
1
                .merge(self.immediate_evict_worker(worker_id, err, false).await);
318
9
        }
319
320
9
        let (is_finished, due_to_backpressure) = match &update {
321
6
            UpdateOperationType::UpdateWithActionStage(action_stage) => {
322
6
                (action_stage.is_finished(), false)
323
            }
324
0
            UpdateOperationType::KeepAlive => (false, false),
325
3
            UpdateOperationType::UpdateWithError(err) => {
326
3
                (true, err.code == Code::ResourceExhausted)
327
            }
328
0
            UpdateOperationType::UpdateWithDisconnect => (true, false),
329
            UpdateOperationType::ExecutionComplete => {
330
                // No update here, just restoring platform properties.
331
0
                worker.execution_complete(operation_id);
332
0
                self.worker_change_notify.notify_one();
333
0
                return Ok(());
334
            }
335
        };
336
337
        // Update the operation in the worker state manager.
338
        {
339
9
            let 
update_operation_res8
= self
340
9
                .worker_state_manager
341
9
                .update_operation(operation_id, worker_id, update)
342
9
                .await
343
8
                .err_tip(|| "in update_operation on SimpleScheduler::update_action");
344
8
            if let Err(
err0
) = update_operation_res {
  Branch (344:20): [True: 0, False: 8]
  Branch (344:20): [Folded - Ignored]
345
0
                error!(
346
                    %operation_id,
347
                    ?worker_id,
348
                    ?err,
349
0
                    "Failed to update_operation on update_action"
350
                );
351
0
                return Err(err);
352
8
            }
353
        }
354
355
8
        if !is_finished {
  Branch (355:12): [True: 0, False: 8]
  Branch (355:12): [Folded - Ignored]
356
0
            return Ok(());
357
8
        }
358
359
        // Clear this action from the current worker if finished.
360
8
        let complete_action_res = {
361
8
            let was_paused = !worker.can_accept_work();
362
363
            // Note: We need to run this before dealing with backpressure logic.
364
8
            let complete_action_res = worker.complete_action(operation_id).await;
365
366
            // Only pause if there's an action still waiting that will unpause.
367
8
            if (was_paused || due_to_backpressure) && 
worker0
.
has_actions0
() {
  Branch (367:17): [True: 0, False: 8]
  Branch (367:31): [True: 0, False: 8]
  Branch (367:55): [True: 0, False: 0]
  Branch (367:17): [Folded - Ignored]
  Branch (367:31): [Folded - Ignored]
  Branch (367:55): [Folded - Ignored]
368
0
                worker.is_paused = true;
369
8
            }
370
8
            complete_action_res
371
        };
372
373
8
        self.worker_change_notify.notify_one();
374
375
8
        complete_action_res
376
9
    }
377
378
    /// Notifies the specified worker to run the given action and handles errors by evicting
379
    /// the worker if the notification fails.
380
33
    async fn worker_notify_run_action(
381
33
        &mut self,
382
33
        worker_id: WorkerId,
383
33
        operation_id: OperationId,
384
33
        action_info: ActionInfoWithProps,
385
33
    ) -> Result<(), Error> {
386
33
        if let Some(worker) = self.workers.get_mut(&worker_id) {
  Branch (386:16): [True: 0, False: 0]
  Branch (386:16): [Folded - Ignored]
  Branch (386:16): [True: 4, False: 0]
  Branch (386:16): [True: 27, False: 0]
  Branch (386:16): [True: 2, False: 0]
387
33
            let notify_worker_result = worker
388
33
                .notify_update(WorkerUpdate::RunAction((operation_id, action_info.clone())))
389
33
                .await;
390
391
33
            if let Err(
notify_worker_result1
) = notify_worker_result {
  Branch (391:20): [True: 0, False: 0]
  Branch (391:20): [Folded - Ignored]
  Branch (391:20): [True: 0, False: 4]
  Branch (391:20): [True: 1, False: 26]
  Branch (391:20): [True: 0, False: 2]
392
1
                warn!(
393
                    ?worker_id,
394
                    ?action_info,
395
                    ?notify_worker_result,
396
1
                    "Worker command failed, removing worker",
397
                );
398
399
                // A slightly nasty way of figuring out that the worker disconnected
400
                // from send_msg_to_worker without introducing complexity to the
401
                // code path from here to there.
402
1
                let is_disconnect = notify_worker_result.code == Code::Internal
  Branch (402:37): [True: 0, False: 0]
  Branch (402:37): [Folded - Ignored]
  Branch (402:37): [True: 0, False: 0]
  Branch (402:37): [True: 1, False: 0]
  Branch (402:37): [True: 0, False: 0]
403
1
                    && notify_worker_result.messages.len() == 1
  Branch (403:24): [True: 0, False: 0]
  Branch (403:24): [Folded - Ignored]
  Branch (403:24): [True: 0, False: 0]
  Branch (403:24): [True: 1, False: 0]
  Branch (403:24): [True: 0, False: 0]
404
1
                    && notify_worker_result.messages[0] == "Worker Disconnected";
405
406
1
                let err = make_err!(
407
1
                    Code::Internal,
408
                    "Worker command failed, removing worker {worker_id} -- {notify_worker_result:?}",
409
                );
410
411
1
                return Result::<(), _>::Err(err.clone()).merge(
412
1
                    self.immediate_evict_worker(&worker_id, err, is_disconnect)
413
1
                        .await,
414
                );
415
32
            }
416
32
            Ok(())
417
        } else {
418
0
            warn!(
419
                ?worker_id,
420
                %operation_id,
421
                ?action_info,
422
0
                "Worker not found in worker map in worker_notify_run_action"
423
            );
424
            // Ensure the operation is put back to queued state.
425
0
            self.worker_state_manager
426
0
                .update_operation(
427
0
                    &operation_id,
428
0
                    &worker_id,
429
0
                    UpdateOperationType::UpdateWithDisconnect,
430
0
                )
431
0
                .await
432
        }
433
33
    }
434
435
    /// Evicts the worker from the pool and puts items back into the queue if anything was being executed on it.
436
10
    async fn immediate_evict_worker(
437
10
        &mut self,
438
10
        worker_id: &WorkerId,
439
10
        err: Error,
440
10
        is_disconnect: bool,
441
10
    ) -> Result<(), Error> {
442
10
        let mut result = Ok(());
443
10
        if let Some(mut worker) = self.remove_worker(worker_id) {
  Branch (443:16): [True: 9, False: 0]
  Branch (443:16): [Folded - Ignored]
  Branch (443:16): [True: 0, False: 0]
  Branch (443:16): [True: 1, False: 0]
  Branch (443:16): [True: 0, False: 0]
444
            // We don't care if we fail to send message to worker, this is only a best attempt.
445
10
            drop(worker.notify_update(WorkerUpdate::Disconnect).await);
446
10
            let update = if is_disconnect {
  Branch (446:29): [True: 0, False: 9]
  Branch (446:29): [Folded - Ignored]
  Branch (446:29): [True: 0, False: 0]
  Branch (446:29): [True: 0, False: 1]
  Branch (446:29): [True: 0, False: 0]
447
0
                UpdateOperationType::UpdateWithDisconnect
448
            } else {
449
10
                UpdateOperationType::UpdateWithError(err)
450
            };
451
10
            for (
operation_id8
, _) in worker.running_action_infos.drain() {
452
8
                result = result.merge(
453
8
                    self.worker_state_manager
454
8
                        .update_operation(&operation_id, worker_id, update.clone())
455
8
                        .await,
456
                );
457
            }
458
0
        }
459
        // Note: Calling this many time is very cheap, it'll only trigger `do_try_match` once.
460
        // TODO(palfrey) This should be moved to inside the Workers struct.
461
10
        self.worker_change_notify.notify_one();
462
10
        result
463
10
    }
464
}
465
466
#[derive(Debug, MetricsComponent)]
467
pub struct ApiWorkerScheduler {
468
    #[metric]
469
    inner: Mutex<ApiWorkerSchedulerImpl>,
470
    #[metric(group = "platform_property_manager")]
471
    platform_property_manager: Arc<PlatformPropertyManager>,
472
473
    #[metric(
474
        help = "Timeout of how long to evict workers if no response in this given amount of time in seconds."
475
    )]
476
    worker_timeout_s: u64,
477
    /// Shared worker registry for checking worker liveness.
478
    worker_registry: SharedWorkerRegistry,
479
480
    /// Performance metrics for observability.
481
    metrics: Arc<SchedulerMetrics>,
482
}
483
484
impl ApiWorkerScheduler {
485
31
    pub fn new(
486
31
        worker_state_manager: Arc<dyn WorkerStateManager>,
487
31
        platform_property_manager: Arc<PlatformPropertyManager>,
488
31
        allocation_strategy: WorkerAllocationStrategy,
489
31
        worker_change_notify: Arc<Notify>,
490
31
        worker_timeout_s: u64,
491
31
        worker_registry: SharedWorkerRegistry,
492
31
    ) -> Arc<Self> {
493
31
        Arc::new(Self {
494
31
            inner: Mutex::new(ApiWorkerSchedulerImpl {
495
31
                workers: Workers(LruCache::unbounded()),
496
31
                worker_state_manager: worker_state_manager.clone(),
497
31
                allocation_strategy,
498
31
                worker_change_notify,
499
31
                worker_registry: worker_registry.clone(),
500
31
                shutting_down: false,
501
31
                capability_index: WorkerCapabilityIndex::new(),
502
31
            }),
503
31
            platform_property_manager,
504
31
            worker_timeout_s,
505
31
            worker_registry,
506
31
            metrics: Arc::new(SchedulerMetrics::default()),
507
31
        })
508
31
    }
509
510
    /// Returns a reference to the worker registry.
511
0
    pub const fn worker_registry(&self) -> &SharedWorkerRegistry {
512
0
        &self.worker_registry
513
0
    }
514
515
33
    pub async fn worker_notify_run_action(
516
33
        &self,
517
33
        worker_id: WorkerId,
518
33
        operation_id: OperationId,
519
33
        action_info: ActionInfoWithProps,
520
33
    ) -> Result<(), Error> {
521
33
        self.metrics
522
33
            .actions_dispatched
523
33
            .fetch_add(1, Ordering::Relaxed);
524
33
        let mut inner = self.inner.lock().await;
525
33
        inner
526
33
            .worker_notify_run_action(worker_id, operation_id, action_info)
527
33
            .await
528
33
    }
529
530
    /// Returns the scheduler metrics for observability.
531
    #[must_use]
532
0
    pub const fn get_metrics(&self) -> &Arc<SchedulerMetrics> {
533
0
        &self.metrics
534
0
    }
535
536
    /// Attempts to find a worker that is capable of running this action.
537
    // TODO(palfrey) This algorithm is not very efficient. Simple testing using a tree-like
538
    // structure showed worse performance on a 10_000 worker * 7 properties * 1000 queued tasks
539
    // simulation of worst cases in a single threaded environment.
540
52
    pub async fn find_worker_for_action(
541
52
        &self,
542
52
        platform_properties: &PlatformProperties,
543
52
        full_worker_logging: bool,
544
52
    ) -> Option<WorkerId> {
545
52
        let start = Instant::now();
546
52
        self.metrics
547
52
            .find_worker_calls
548
52
            .fetch_add(1, Ordering::Relaxed);
549
550
52
        let inner = self.inner.lock().await;
551
52
        let worker_count = inner.workers.len() as u64;
552
52
        let result = inner.inner_find_worker_for_action(platform_properties, full_worker_logging);
553
554
        // Track workers iterated (worst case is all workers)
555
52
        self.metrics
556
52
            .workers_iterated
557
52
            .fetch_add(worker_count, Ordering::Relaxed);
558
559
52
        if result.is_some() {
  Branch (559:12): [True: 0, False: 0]
  Branch (559:12): [Folded - Ignored]
  Branch (559:12): [True: 4, False: 1]
  Branch (559:12): [True: 28, False: 17]
  Branch (559:12): [True: 1, False: 1]
560
33
            self.metrics
561
33
                .find_worker_hits
562
33
                .fetch_add(1, Ordering::Relaxed);
563
33
        } else {
564
19
            self.metrics
565
19
                .find_worker_misses
566
19
                .fetch_add(1, Ordering::Relaxed);
567
19
        }
568
569
        #[allow(clippy::cast_possible_truncation)]
570
52
        self.metrics
571
52
            .find_worker_time_ns
572
52
            .fetch_add(start.elapsed().as_nanos() as u64, Ordering::Relaxed);
573
52
        result
574
52
    }
575
576
    /// Checks to see if the worker exists in the worker pool. Should only be used in unit tests.
577
    #[must_use]
578
7
    
pub async fn contains_worker_for_test(&self, worker_id: &WorkerId) -> bool0
{
579
7
        let inner = self.inner.lock().await;
580
7
        inner.workers.contains(worker_id)
581
7
    }
582
583
    /// A unit test function used to send the keep alive message to the worker from the server.
584
0
    pub async fn send_keep_alive_to_worker_for_test(
585
0
        &self,
586
0
        worker_id: &WorkerId,
587
1
    ) -> Result<(), Error> {
588
1
        let mut inner = self.inner.lock().await;
589
1
        let worker = inner.workers.get_mut(worker_id).ok_or_else(|| 
{0
590
0
            make_input_err!("WorkerId '{}' does not exist in workers map", worker_id)
591
0
        })?;
592
1
        worker.keep_alive()
593
1
    }
594
}
595
596
#[async_trait]
597
impl WorkerScheduler for ApiWorkerScheduler {
598
2
    fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
599
2
        self.platform_property_manager.as_ref()
600
2
    }
601
602
36
    async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
603
        let worker_id = worker.id.clone();
604
        let worker_timestamp = worker.last_update_timestamp;
605
        let mut inner = self.inner.lock().await;
606
        if inner.shutting_down {
607
            warn!("Rejected worker add during shutdown: {}", worker_id);
608
            return Err(make_err!(
609
                Code::Unavailable,
610
                "Received request to add worker while shutting down"
611
            ));
612
        }
613
        let result = inner
614
            .add_worker(worker)
615
            .err_tip(|| "Error while adding worker, removing from pool");
616
        if let Err(err) = result {
617
            return Result::<(), _>::Err(err.clone())
618
                .merge(inner.immediate_evict_worker(&worker_id, err, false).await);
619
        }
620
621
        let now = UNIX_EPOCH + Duration::from_secs(worker_timestamp);
622
        self.worker_registry.register_worker(&worker_id, now).await;
623
624
        self.metrics.workers_added.fetch_add(1, Ordering::Relaxed);
625
        Ok(())
626
36
    }
627
628
    async fn update_action(
629
        &self,
630
        worker_id: &WorkerId,
631
        operation_id: &OperationId,
632
        update: UpdateOperationType,
633
10
    ) -> Result<(), Error> {
634
        let mut inner = self.inner.lock().await;
635
        inner.update_action(worker_id, operation_id, update).await
636
10
    }
637
638
    async fn worker_keep_alive_received(
639
        &self,
640
        worker_id: &WorkerId,
641
        timestamp: WorkerTimestamp,
642
2
    ) -> Result<(), Error> {
643
        {
644
            let mut inner = self.inner.lock().await;
645
            inner
646
                .refresh_lifetime(worker_id, timestamp)
647
                .err_tip(|| "Error refreshing lifetime in worker_keep_alive_received()")?;
648
        }
649
        let now = UNIX_EPOCH + Duration::from_secs(timestamp);
650
        self.worker_registry
651
            .update_worker_heartbeat(worker_id, now)
652
            .await;
653
        Ok(())
654
2
    }
655
656
6
    async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
657
        self.worker_registry.remove_worker(worker_id).await;
658
659
        let mut inner = self.inner.lock().await;
660
        inner
661
            .immediate_evict_worker(
662
                worker_id,
663
                make_err!(Code::Internal, "Received request to remove worker"),
664
                false,
665
            )
666
            .await
667
6
    }
668
669
0
    async fn shutdown(&self, shutdown_guard: ShutdownGuard) {
670
        let mut inner = self.inner.lock().await;
671
        inner.shutting_down = true; // should reject further worker registration
672
        while let Some(worker_id) = inner
673
            .workers
674
            .peek_lru()
675
0
            .map(|(worker_id, _worker)| worker_id.clone())
676
        {
677
            if let Err(err) = inner
678
                .immediate_evict_worker(
679
                    &worker_id,
680
                    make_err!(Code::Internal, "Scheduler shutdown"),
681
                    true,
682
                )
683
                .await
684
            {
685
                error!(?err, "Error evicting worker on shutdown.");
686
            }
687
        }
688
        drop(shutdown_guard);
689
0
    }
690
691
5
    async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
692
        // Check worker liveness using both the local timestamp (from LRU)
693
        // and the worker registry. A worker is alive if either source says it's alive.
694
        let timeout = Duration::from_secs(self.worker_timeout_s);
695
        let now = UNIX_EPOCH + Duration::from_secs(now_timestamp);
696
        let timeout_threshold = now_timestamp.saturating_sub(self.worker_timeout_s);
697
698
        let workers_to_check: Vec<(WorkerId, bool)> = {
699
            let inner = self.inner.lock().await;
700
            inner
701
                .workers
702
                .iter()
703
6
                .map(|(worker_id, worker)| {
704
6
                    let local_alive = worker.last_update_timestamp > timeout_threshold;
705
6
                    (worker_id.clone(), local_alive)
706
6
                })
707
                .collect()
708
        };
709
710
        let mut worker_ids_to_remove = Vec::new();
711
        for (worker_id, local_alive) in workers_to_check {
712
            if local_alive {
713
                continue;
714
            }
715
716
            let registry_alive = self
717
                .worker_registry
718
                .is_worker_alive(&worker_id, timeout, now)
719
                .await;
720
721
            if !registry_alive {
722
                trace!(
723
                    ?worker_id,
724
                    local_alive,
725
                    registry_alive,
726
                    timeout_threshold,
727
                    "Worker timed out - neither local nor registry shows alive"
728
                );
729
                worker_ids_to_remove.push(worker_id);
730
            }
731
        }
732
733
        if worker_ids_to_remove.is_empty() {
734
            return Ok(());
735
        }
736
737
        let mut inner = self.inner.lock().await;
738
        let mut result = Ok(());
739
740
        for worker_id in &worker_ids_to_remove {
741
            warn!(?worker_id, "Worker timed out, removing from pool");
742
            result = result.merge(
743
                inner
744
                    .immediate_evict_worker(
745
                        worker_id,
746
                        make_err!(
747
                            Code::Internal,
748
                            "Worker {worker_id} timed out, removing from pool"
749
                        ),
750
                        false,
751
                    )
752
                    .await,
753
            );
754
        }
755
756
        result
757
5
    }
758
759
2
    async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
760
        let mut inner = self.inner.lock().await;
761
        inner.set_drain_worker(worker_id, is_draining).await
762
2
    }
763
}
764
765
impl RootMetricsComponent for ApiWorkerScheduler {}