Coverage Report

Created: 2026-03-05 11:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/api_worker_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::ops::{Deref, DerefMut};
16
use core::sync::atomic::{AtomicU64, Ordering};
17
use core::time::Duration;
18
use std::sync::Arc;
19
use std::time::{Instant, UNIX_EPOCH};
20
21
use async_lock::Mutex;
22
use lru::LruCache;
23
use nativelink_config::schedulers::WorkerAllocationStrategy;
24
use nativelink_error::{Code, Error, ResultExt, error_if, make_err, make_input_err};
25
use nativelink_metric::{
26
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
27
    RootMetricsComponent, group,
28
};
29
use nativelink_util::action_messages::{OperationId, WorkerId};
30
use nativelink_util::operation_state_manager::{UpdateOperationType, WorkerStateManager};
31
use nativelink_util::platform_properties::PlatformProperties;
32
use nativelink_util::shutdown_guard::ShutdownGuard;
33
use tokio::sync::Notify;
34
use tonic::async_trait;
35
use tracing::{error, info, trace, warn};
36
37
/// Metrics for tracking scheduler performance.
38
#[derive(Debug, Default)]
39
pub struct SchedulerMetrics {
40
    /// Total number of worker additions.
41
    pub workers_added: AtomicU64,
42
    /// Total number of worker removals.
43
    pub workers_removed: AtomicU64,
44
    /// Total number of `find_worker_for_action` calls.
45
    pub find_worker_calls: AtomicU64,
46
    /// Total number of successful worker matches.
47
    pub find_worker_hits: AtomicU64,
48
    /// Total number of failed worker matches (no worker found).
49
    pub find_worker_misses: AtomicU64,
50
    /// Total time spent in `find_worker_for_action` (nanoseconds).
51
    pub find_worker_time_ns: AtomicU64,
52
    /// Total number of workers iterated during find operations.
53
    pub workers_iterated: AtomicU64,
54
    /// Total number of action dispatches.
55
    pub actions_dispatched: AtomicU64,
56
    /// Total number of keep-alive updates.
57
    pub keep_alive_updates: AtomicU64,
58
    /// Total number of worker timeouts.
59
    pub worker_timeouts: AtomicU64,
60
}
61
62
use crate::platform_property_manager::PlatformPropertyManager;
63
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp, WorkerUpdate};
64
use crate::worker_capability_index::WorkerCapabilityIndex;
65
use crate::worker_registry::SharedWorkerRegistry;
66
use crate::worker_scheduler::WorkerScheduler;
67
68
#[derive(Debug)]
69
struct Workers(LruCache<WorkerId, Worker>);
70
71
impl Deref for Workers {
72
    type Target = LruCache<WorkerId, Worker>;
73
74
168
    fn deref(&self) -> &Self::Target {
75
168
        &self.0
76
168
    }
77
}
78
79
impl DerefMut for Workers {
80
130
    fn deref_mut(&mut self) -> &mut Self::Target {
81
130
        &mut self.0
82
130
    }
83
}
84
85
// Note: This could not be a derive macro because this derive-macro
86
// does not support LruCache and nameless field structs.
87
impl MetricsComponent for Workers {
88
0
    fn publish(
89
0
        &self,
90
0
        _kind: MetricKind,
91
0
        _field_metadata: MetricFieldData,
92
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
93
0
        let _enter = group!("workers").entered();
94
0
        for (worker_id, worker) in self.iter() {
95
0
            let _enter = group!(worker_id).entered();
96
0
            worker.publish(MetricKind::Component, MetricFieldData::default())?;
97
        }
98
0
        Ok(MetricPublishKnownKindData::Component)
99
0
    }
100
}
101
102
/// A collection of workers that are available to run tasks.
103
#[derive(MetricsComponent)]
104
struct ApiWorkerSchedulerImpl {
105
    /// A `LruCache` of workers available based on `allocation_strategy`.
106
    #[metric(group = "workers")]
107
    workers: Workers,
108
109
    /// The worker state manager.
110
    #[metric(group = "worker_state_manager")]
111
    worker_state_manager: Arc<dyn WorkerStateManager>,
112
    /// The allocation strategy for workers.
113
    allocation_strategy: WorkerAllocationStrategy,
114
    /// A channel to notify the matching engine that the worker pool has changed.
115
    worker_change_notify: Arc<Notify>,
116
    /// Worker registry for tracking worker liveness.
117
    worker_registry: SharedWorkerRegistry,
118
119
    /// Whether the worker scheduler is shutting down.
120
    shutting_down: bool,
121
122
    /// Index for fast worker capability lookup.
123
    /// Used to accelerate `find_worker_for_action` by filtering candidates
124
    /// based on properties before doing linear scan.
125
    capability_index: WorkerCapabilityIndex,
126
}
127
128
impl core::fmt::Debug for ApiWorkerSchedulerImpl {
129
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
130
0
        f.debug_struct("ApiWorkerSchedulerImpl")
131
0
            .field("workers", &self.workers)
132
0
            .field("allocation_strategy", &self.allocation_strategy)
133
0
            .field("worker_change_notify", &self.worker_change_notify)
134
0
            .field(
135
0
                "capability_index_size",
136
0
                &self.capability_index.worker_count(),
137
0
            )
138
0
            .field("worker_registry", &self.worker_registry)
139
0
            .finish_non_exhaustive()
140
0
    }
141
}
142
143
impl ApiWorkerSchedulerImpl {
144
    /// Refreshes the lifetime of the worker with the given timestamp.
145
    ///
146
    /// Instead of sending N keepalive messages (one per operation),
147
    /// we now send a single worker heartbeat. The worker registry tracks worker liveness,
148
    /// and timeout detection checks the worker's `last_seen` instead of per-operation timestamps.
149
    ///
150
    /// Note: This only updates the local worker state. The worker registry is updated
151
    /// separately after releasing the inner lock to reduce contention.
152
2
    fn refresh_lifetime(
153
2
        &mut self,
154
2
        worker_id: &WorkerId,
155
2
        timestamp: WorkerTimestamp,
156
2
    ) -> Result<(), Error> {
157
2
        let worker = self.workers.0.peek_mut(worker_id).ok_or_else(|| 
{0
158
0
            make_input_err!(
159
                "Worker not found in worker map in refresh_lifetime() {}",
160
                worker_id
161
            )
162
0
        })?;
163
0
        error_if!(
164
2
            worker.last_update_timestamp > timestamp,
  Branch (164:13): [True: 0, False: 2]
  Branch (164:13): [Folded - Ignored]
165
            "Worker already had a timestamp of {}, but tried to update it with {}",
166
            worker.last_update_timestamp,
167
            timestamp
168
        );
169
2
        worker.last_update_timestamp = timestamp;
170
171
2
        trace!(
172
            ?worker_id,
173
2
            running_operations = worker.running_action_infos.len(),
174
2
            "Worker keepalive received"
175
        );
176
177
2
        Ok(())
178
2
    }
179
180
    /// Adds a worker to the pool.
181
    /// Note: This function will not do any task matching.
182
37
    fn add_worker(&mut self, worker: Worker) -> Result<(), Error> {
183
37
        let worker_id = worker.id.clone();
184
37
        let platform_properties = worker.platform_properties.clone();
185
37
        self.workers.put(worker_id.clone(), worker);
186
187
        // Add to capability index for fast matching
188
37
        self.capability_index
189
37
            .add_worker(&worker_id, &platform_properties);
190
191
        // Worker is not cloneable, and we do not want to send the initial connection results until
192
        // we have added it to the map, or we might get some strange race conditions due to the way
193
        // the multi-threaded runtime works.
194
37
        let worker = self.workers.peek_mut(&worker_id).unwrap();
195
37
        let res = worker
196
37
            .send_initial_connection_result()
197
37
            .err_tip(|| "Failed to send initial connection result to worker");
198
37
        if let Err(
err0
) = &res {
  Branch (198:16): [True: 0, False: 37]
  Branch (198:16): [Folded - Ignored]
199
0
            error!(
200
                ?worker_id,
201
                ?err,
202
0
                "Worker connection appears to have been closed while adding to pool"
203
            );
204
37
        }
205
37
        self.worker_change_notify.notify_one();
206
37
        res
207
37
    }
208
209
    /// Removes worker from pool.
210
    /// Note: The caller is responsible for any rescheduling of any tasks that might be
211
    /// running.
212
10
    fn remove_worker(&mut self, worker_id: &WorkerId) -> Option<Worker> {
213
        // Remove from capability index
214
10
        self.capability_index.remove_worker(worker_id);
215
216
10
        let result = self.workers.pop(worker_id);
217
10
        self.worker_change_notify.notify_one();
218
10
        result
219
10
    }
220
221
    /// Sets if the worker is draining or not.
222
2
    async fn set_drain_worker(
223
2
        &mut self,
224
2
        worker_id: &WorkerId,
225
2
        is_draining: bool,
226
2
    ) -> Result<(), Error> {
227
2
        let worker = self
228
2
            .workers
229
2
            .get_mut(worker_id)
230
2
            .err_tip(|| format!(
"Worker {worker_id} doesn't exist in the pool"0
))
?0
;
231
2
        worker.is_draining = is_draining;
232
2
        self.worker_change_notify.notify_one();
233
2
        Ok(())
234
2
    }
235
236
57
    fn inner_find_worker_for_action(
237
57
        &self,
238
57
        platform_properties: &PlatformProperties,
239
57
        full_worker_logging: bool,
240
57
    ) -> Option<WorkerId> {
241
        // Do a fast check to see if any workers are available at all for work allocation
242
57
        if !self.workers.iter().any(|(_, w)| 
w45
.
can_accept_work45
()) {
  Branch (242:12): [True: 14, False: 43]
  Branch (242:12): [Folded - Ignored]
243
14
            if full_worker_logging {
  Branch (243:16): [True: 2, False: 12]
  Branch (243:16): [Folded - Ignored]
244
2
                info!("All workers are fully allocated");
245
12
            }
246
14
            return None;
247
43
        }
248
249
        // Use capability index to get candidate workers that match STATIC properties
250
        // (Exact, Unknown) and have the required property keys (Priority, Minimum).
251
        // This reduces complexity from O(W × P) to O(P × log(W)) for exact properties.
252
43
        let candidates = self
253
43
            .capability_index
254
43
            .find_matching_workers(platform_properties, full_worker_logging);
255
256
43
        if candidates.is_empty() {
  Branch (256:12): [True: 1, False: 42]
  Branch (256:12): [Folded - Ignored]
257
1
            if full_worker_logging {
  Branch (257:16): [True: 0, False: 1]
  Branch (257:16): [Folded - Ignored]
258
0
                info!("No workers in capability index match required properties");
259
1
            }
260
1
            return None;
261
42
        }
262
263
        // Check function for availability AND dynamic Minimum property verification.
264
        // The index only does presence checks for Minimum properties since their
265
        // values change dynamically as jobs are assigned to workers.
266
42
        let worker_matches = |(worker_id, w): &(&WorkerId, &Worker)| -> bool {
267
42
            if !w.can_accept_work() {
  Branch (267:16): [True: 0, False: 42]
  Branch (267:16): [Folded - Ignored]
268
0
                if full_worker_logging {
  Branch (268:20): [True: 0, False: 0]
  Branch (268:20): [Folded - Ignored]
269
0
                    info!(
270
0
                        "Worker {worker_id} cannot accept work: is_paused={}, is_draining={}, inflight={}/{}",
271
                        w.is_paused,
272
                        w.is_draining,
273
0
                        w.running_action_infos.len(),
274
                        w.max_inflight_tasks
275
                    );
276
0
                }
277
0
                return false;
278
42
            }
279
280
            // Verify Minimum properties at runtime (their values are dynamic)
281
42
            if !platform_properties.is_satisfied_by(&w.platform_properties, full_worker_logging) {
  Branch (281:16): [True: 5, False: 37]
  Branch (281:16): [Folded - Ignored]
282
5
                return false;
283
37
            }
284
285
37
            true
286
42
        };
287
288
        // Now check constraints on filtered candidates.
289
        // Iterate in LRU order based on allocation strategy.
290
42
        let workers_iter = self.workers.iter();
291
292
42
        let worker_id = match self.allocation_strategy {
293
            // Use rfind to get the least recently used that satisfies the properties.
294
42
            WorkerAllocationStrategy::LeastRecentlyUsed => workers_iter
295
42
                .rev()
296
43
                .
filter42
(|(worker_id, _)| candidates.contains(worker_id))
297
42
                .find(&worker_matches)
298
42
                .map(|(_, w)| 
w.id37
.
clone37
()),
299
300
            // Use find to get the most recently used that satisfies the properties.
301
0
            WorkerAllocationStrategy::MostRecentlyUsed => workers_iter
302
0
                .filter(|(worker_id, _)| candidates.contains(worker_id))
303
0
                .find(&worker_matches)
304
0
                .map(|(_, w)| w.id.clone()),
305
        };
306
42
        if full_worker_logging && 
worker_id8
.
is_none8
() {
  Branch (306:12): [True: 8, False: 34]
  Branch (306:35): [True: 1, False: 7]
  Branch (306:12): [Folded - Ignored]
  Branch (306:35): [Folded - Ignored]
307
1
            warn!("No workers matched!");
308
41
        }
309
42
        worker_id
310
57
    }
311
312
10
    async fn update_action(
313
10
        &mut self,
314
10
        worker_id: &WorkerId,
315
10
        operation_id: &OperationId,
316
10
        update: UpdateOperationType,
317
10
    ) -> Result<(), Error> {
318
10
        let worker = self.workers.get_mut(worker_id).err_tip(|| 
{0
319
0
            format!("Worker {worker_id} does not exist in SimpleScheduler::update_action")
320
0
        })?;
321
322
        // Ensure the worker is supposed to be running the operation.
323
10
        if !worker.running_action_infos.contains_key(operation_id) {
  Branch (323:12): [True: 1, False: 9]
  Branch (323:12): [Folded - Ignored]
324
1
            let err = make_err!(
325
1
                Code::Internal,
326
                "Operation {operation_id} should not be running on worker {worker_id} in SimpleScheduler::update_action"
327
            );
328
1
            return Result::<(), _>::Err(err.clone())
329
1
                .merge(self.immediate_evict_worker(worker_id, err, false).await);
330
9
        }
331
332
9
        let (is_finished, due_to_backpressure) = match &update {
333
6
            UpdateOperationType::UpdateWithActionStage(action_stage) => {
334
6
                (action_stage.is_finished(), false)
335
            }
336
0
            UpdateOperationType::KeepAlive => (false, false),
337
3
            UpdateOperationType::UpdateWithError(err) => {
338
3
                (true, err.code == Code::ResourceExhausted)
339
            }
340
0
            UpdateOperationType::UpdateWithDisconnect => (true, false),
341
            UpdateOperationType::ExecutionComplete => {
342
                // No update here, just restoring platform properties.
343
0
                worker.execution_complete(operation_id);
344
0
                self.worker_change_notify.notify_one();
345
0
                return Ok(());
346
            }
347
        };
348
349
        // Update the operation in the worker state manager.
350
        {
351
9
            let 
update_operation_res8
= self
352
9
                .worker_state_manager
353
9
                .update_operation(operation_id, worker_id, update)
354
9
                .await
355
8
                .err_tip(|| "in update_operation on SimpleScheduler::update_action");
356
8
            if let Err(
err0
) = update_operation_res {
  Branch (356:20): [True: 0, False: 8]
  Branch (356:20): [Folded - Ignored]
357
0
                error!(
358
                    %operation_id,
359
                    ?worker_id,
360
                    ?err,
361
0
                    "Failed to update_operation on update_action"
362
                );
363
0
                return Err(err);
364
8
            }
365
        }
366
367
8
        if !is_finished {
  Branch (367:12): [True: 0, False: 8]
  Branch (367:12): [Folded - Ignored]
368
0
            return Ok(());
369
8
        }
370
371
        // Clear this action from the current worker if finished.
372
8
        let complete_action_res = {
373
            // Note: We need to run this before dealing with backpressure logic.
374
8
            let complete_action_res = worker.complete_action(operation_id).await;
375
376
8
            if (due_to_backpressure || !worker.can_accept_work()) && 
worker0
.
has_actions0
() {
  Branch (376:17): [True: 0, False: 8]
  Branch (376:40): [True: 0, False: 8]
  Branch (376:70): [True: 0, False: 0]
  Branch (376:17): [Folded - Ignored]
  Branch (376:40): [Folded - Ignored]
  Branch (376:70): [Folded - Ignored]
377
0
                worker.is_paused = true;
378
8
            }
379
8
            complete_action_res
380
        };
381
382
8
        self.worker_change_notify.notify_one();
383
384
8
        complete_action_res
385
9
    }
386
387
    /// Notifies the specified worker to run the given action and handles errors by evicting
388
    /// the worker if the notification fails.
389
33
    async fn worker_notify_run_action(
390
33
        &mut self,
391
33
        worker_id: WorkerId,
392
33
        operation_id: OperationId,
393
33
        action_info: ActionInfoWithProps,
394
33
    ) -> Result<(), Error> {
395
33
        if let Some(worker) = self.workers.get_mut(&worker_id) {
  Branch (395:16): [True: 0, False: 0]
  Branch (395:16): [Folded - Ignored]
  Branch (395:16): [True: 4, False: 0]
  Branch (395:16): [True: 27, False: 0]
  Branch (395:16): [True: 2, False: 0]
396
33
            let notify_worker_result = worker
397
33
                .notify_update(WorkerUpdate::RunAction((operation_id, action_info.clone())))
398
33
                .await;
399
400
33
            if let Err(
notify_worker_result1
) = notify_worker_result {
  Branch (400:20): [True: 0, False: 0]
  Branch (400:20): [Folded - Ignored]
  Branch (400:20): [True: 0, False: 4]
  Branch (400:20): [True: 1, False: 26]
  Branch (400:20): [True: 0, False: 2]
401
1
                warn!(
402
                    ?worker_id,
403
                    ?action_info,
404
                    ?notify_worker_result,
405
1
                    "Worker command failed, removing worker",
406
                );
407
408
                // A slightly nasty way of figuring out that the worker disconnected
409
                // from send_msg_to_worker without introducing complexity to the
410
                // code path from here to there.
411
1
                let is_disconnect = notify_worker_result.code == Code::Internal
  Branch (411:37): [True: 0, False: 0]
  Branch (411:37): [Folded - Ignored]
  Branch (411:37): [True: 0, False: 0]
  Branch (411:37): [True: 1, False: 0]
  Branch (411:37): [True: 0, False: 0]
412
1
                    && notify_worker_result.messages.len() == 1
  Branch (412:24): [True: 0, False: 0]
  Branch (412:24): [Folded - Ignored]
  Branch (412:24): [True: 0, False: 0]
  Branch (412:24): [True: 1, False: 0]
  Branch (412:24): [True: 0, False: 0]
413
1
                    && notify_worker_result.messages[0] == "Worker Disconnected";
414
415
1
                let err = make_err!(
416
1
                    Code::Internal,
417
                    "Worker command failed, removing worker {worker_id} -- {notify_worker_result:?}",
418
                );
419
420
1
                return Result::<(), _>::Err(err.clone()).merge(
421
1
                    self.immediate_evict_worker(&worker_id, err, is_disconnect)
422
1
                        .await,
423
                );
424
32
            }
425
32
            Ok(())
426
        } else {
427
0
            warn!(
428
                ?worker_id,
429
                %operation_id,
430
                ?action_info,
431
0
                "Worker not found in worker map in worker_notify_run_action"
432
            );
433
            // Ensure the operation is put back to queued state.
434
0
            self.worker_state_manager
435
0
                .update_operation(
436
0
                    &operation_id,
437
0
                    &worker_id,
438
0
                    UpdateOperationType::UpdateWithDisconnect,
439
0
                )
440
0
                .await
441
        }
442
33
    }
443
444
    /// Evicts the worker from the pool and puts items back into the queue if anything was being executed on it.
445
10
    async fn immediate_evict_worker(
446
10
        &mut self,
447
10
        worker_id: &WorkerId,
448
10
        err: Error,
449
10
        is_disconnect: bool,
450
10
    ) -> Result<(), Error> {
451
10
        let mut result = Ok(());
452
10
        if let Some(mut worker) = self.remove_worker(worker_id) {
  Branch (452:16): [True: 9, False: 0]
  Branch (452:16): [Folded - Ignored]
  Branch (452:16): [True: 0, False: 0]
  Branch (452:16): [True: 1, False: 0]
  Branch (452:16): [True: 0, False: 0]
453
            // We don't care if we fail to send message to worker, this is only a best attempt.
454
10
            drop(worker.notify_update(WorkerUpdate::Disconnect).await);
455
10
            let update = if is_disconnect {
  Branch (455:29): [True: 0, False: 9]
  Branch (455:29): [Folded - Ignored]
  Branch (455:29): [True: 0, False: 0]
  Branch (455:29): [True: 0, False: 1]
  Branch (455:29): [True: 0, False: 0]
456
0
                UpdateOperationType::UpdateWithDisconnect
457
            } else {
458
10
                UpdateOperationType::UpdateWithError(err)
459
            };
460
10
            for (
operation_id8
, _) in worker.running_action_infos.drain() {
461
8
                result = result.merge(
462
8
                    self.worker_state_manager
463
8
                        .update_operation(&operation_id, worker_id, update.clone())
464
8
                        .await,
465
                );
466
            }
467
0
        }
468
        // Note: Calling this many time is very cheap, it'll only trigger `do_try_match` once.
469
        // TODO(palfrey) This should be moved to inside the Workers struct.
470
10
        self.worker_change_notify.notify_one();
471
10
        result
472
10
    }
473
}
474
475
#[derive(Debug, MetricsComponent)]
476
pub struct ApiWorkerScheduler {
477
    #[metric]
478
    inner: Mutex<ApiWorkerSchedulerImpl>,
479
    #[metric(group = "platform_property_manager")]
480
    platform_property_manager: Arc<PlatformPropertyManager>,
481
482
    #[metric(
483
        help = "Timeout of how long to evict workers if no response in this given amount of time in seconds."
484
    )]
485
    worker_timeout_s: u64,
486
    /// Shared worker registry for checking worker liveness.
487
    worker_registry: SharedWorkerRegistry,
488
489
    /// Performance metrics for observability.
490
    metrics: Arc<SchedulerMetrics>,
491
}
492
493
impl ApiWorkerScheduler {
494
32
    pub fn new(
495
32
        worker_state_manager: Arc<dyn WorkerStateManager>,
496
32
        platform_property_manager: Arc<PlatformPropertyManager>,
497
32
        allocation_strategy: WorkerAllocationStrategy,
498
32
        worker_change_notify: Arc<Notify>,
499
32
        worker_timeout_s: u64,
500
32
        worker_registry: SharedWorkerRegistry,
501
32
    ) -> Arc<Self> {
502
32
        Arc::new(Self {
503
32
            inner: Mutex::new(ApiWorkerSchedulerImpl {
504
32
                workers: Workers(LruCache::unbounded()),
505
32
                worker_state_manager,
506
32
                allocation_strategy,
507
32
                worker_change_notify,
508
32
                worker_registry: worker_registry.clone(),
509
32
                shutting_down: false,
510
32
                capability_index: WorkerCapabilityIndex::new(),
511
32
            }),
512
32
            platform_property_manager,
513
32
            worker_timeout_s,
514
32
            worker_registry,
515
32
            metrics: Arc::new(SchedulerMetrics::default()),
516
32
        })
517
32
    }
518
519
    /// Returns a reference to the worker registry.
520
0
    pub const fn worker_registry(&self) -> &SharedWorkerRegistry {
521
0
        &self.worker_registry
522
0
    }
523
524
33
    pub async fn worker_notify_run_action(
525
33
        &self,
526
33
        worker_id: WorkerId,
527
33
        operation_id: OperationId,
528
33
        action_info: ActionInfoWithProps,
529
33
    ) -> Result<(), Error> {
530
33
        self.metrics
531
33
            .actions_dispatched
532
33
            .fetch_add(1, Ordering::Relaxed);
533
33
        let mut inner = self.inner.lock().await;
534
33
        inner
535
33
            .worker_notify_run_action(worker_id, operation_id, action_info)
536
33
            .await
537
33
    }
538
539
    /// Returns the scheduler metrics for observability.
540
    #[must_use]
541
0
    pub const fn get_metrics(&self) -> &Arc<SchedulerMetrics> {
542
0
        &self.metrics
543
0
    }
544
545
    /// Attempts to find a worker that is capable of running this action.
546
    // TODO(palfrey) This algorithm is not very efficient. Simple testing using a tree-like
547
    // structure showed worse performance on a 10_000 worker * 7 properties * 1000 queued tasks
548
    // simulation of worst cases in a single threaded environment.
549
57
    pub async fn find_worker_for_action(
550
57
        &self,
551
57
        platform_properties: &PlatformProperties,
552
57
        full_worker_logging: bool,
553
57
    ) -> Option<WorkerId> {
554
57
        let start = Instant::now();
555
57
        self.metrics
556
57
            .find_worker_calls
557
57
            .fetch_add(1, Ordering::Relaxed);
558
559
57
        let inner = self.inner.lock().await;
560
57
        let worker_count = inner.workers.len() as u64;
561
57
        let result = inner.inner_find_worker_for_action(platform_properties, full_worker_logging);
562
563
        // Track workers iterated (worst case is all workers)
564
57
        self.metrics
565
57
            .workers_iterated
566
57
            .fetch_add(worker_count, Ordering::Relaxed);
567
568
57
        if result.is_some() {
  Branch (568:12): [True: 0, False: 0]
  Branch (568:12): [Folded - Ignored]
  Branch (568:12): [True: 8, False: 0]
  Branch (568:12): [True: 28, False: 19]
  Branch (568:12): [True: 1, False: 1]
569
37
            self.metrics
570
37
                .find_worker_hits
571
37
                .fetch_add(1, Ordering::Relaxed);
572
37
        } else {
573
20
            self.metrics
574
20
                .find_worker_misses
575
20
                .fetch_add(1, Ordering::Relaxed);
576
20
        }
577
578
        #[allow(clippy::cast_possible_truncation)]
579
57
        self.metrics
580
57
            .find_worker_time_ns
581
57
            .fetch_add(start.elapsed().as_nanos() as u64, Ordering::Relaxed);
582
57
        result
583
57
    }
584
585
    /// Checks to see if the worker exists in the worker pool. Should only be used in unit tests.
586
    #[must_use]
587
7
    pub async fn contains_worker_for_test(&self, worker_id: &WorkerId) -> bool {
588
7
        let inner = self.inner.lock().await;
589
7
        inner.workers.contains(worker_id)
590
7
    }
591
592
    /// A unit test function used to send the keep alive message to the worker from the server.
593
0
    pub async fn send_keep_alive_to_worker_for_test(
594
0
        &self,
595
0
        worker_id: &WorkerId,
596
1
    ) -> Result<(), Error> {
597
1
        let mut inner = self.inner.lock().await;
598
1
        let worker = inner.workers.get_mut(worker_id).ok_or_else(|| 
{0
599
0
            make_input_err!("WorkerId '{}' does not exist in workers map", worker_id)
600
0
        })?;
601
1
        worker.keep_alive()
602
1
    }
603
}
604
605
#[async_trait]
606
impl WorkerScheduler for ApiWorkerScheduler {
607
2
    fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
608
2
        self.platform_property_manager.as_ref()
609
2
    }
610
611
37
    async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
612
        let worker_id = worker.id.clone();
613
        let worker_timestamp = worker.last_update_timestamp;
614
        let mut inner = self.inner.lock().await;
615
        if inner.shutting_down {
616
            warn!("Rejected worker add during shutdown: {}", worker_id);
617
            return Err(make_err!(
618
                Code::Unavailable,
619
                "Received request to add worker while shutting down"
620
            ));
621
        }
622
        let result = inner
623
            .add_worker(worker)
624
            .err_tip(|| "Error while adding worker, removing from pool");
625
        if let Err(err) = result {
626
            return Result::<(), _>::Err(err.clone())
627
                .merge(inner.immediate_evict_worker(&worker_id, err, false).await);
628
        }
629
630
        let now = UNIX_EPOCH + Duration::from_secs(worker_timestamp);
631
        self.worker_registry.register_worker(&worker_id, now).await;
632
633
        self.metrics.workers_added.fetch_add(1, Ordering::Relaxed);
634
        Ok(())
635
37
    }
636
637
    async fn update_action(
638
        &self,
639
        worker_id: &WorkerId,
640
        operation_id: &OperationId,
641
        update: UpdateOperationType,
642
10
    ) -> Result<(), Error> {
643
        let mut inner = self.inner.lock().await;
644
        inner.update_action(worker_id, operation_id, update).await
645
10
    }
646
647
    async fn worker_keep_alive_received(
648
        &self,
649
        worker_id: &WorkerId,
650
        timestamp: WorkerTimestamp,
651
2
    ) -> Result<(), Error> {
652
        {
653
            let mut inner = self.inner.lock().await;
654
            inner
655
                .refresh_lifetime(worker_id, timestamp)
656
                .err_tip(|| "Error refreshing lifetime in worker_keep_alive_received()")?;
657
        }
658
        let now = UNIX_EPOCH + Duration::from_secs(timestamp);
659
        self.worker_registry
660
            .update_worker_heartbeat(worker_id, now)
661
            .await;
662
        Ok(())
663
2
    }
664
665
6
    async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
666
        self.worker_registry.remove_worker(worker_id).await;
667
668
        let mut inner = self.inner.lock().await;
669
        inner
670
            .immediate_evict_worker(
671
                worker_id,
672
                make_err!(Code::Internal, "Received request to remove worker"),
673
                false,
674
            )
675
            .await
676
6
    }
677
678
0
    async fn shutdown(&self, shutdown_guard: ShutdownGuard) {
679
        let mut inner = self.inner.lock().await;
680
        inner.shutting_down = true; // should reject further worker registration
681
        while let Some(worker_id) = inner
682
            .workers
683
            .peek_lru()
684
0
            .map(|(worker_id, _worker)| worker_id.clone())
685
        {
686
            if let Err(err) = inner
687
                .immediate_evict_worker(
688
                    &worker_id,
689
                    make_err!(Code::Internal, "Scheduler shutdown"),
690
                    true,
691
                )
692
                .await
693
            {
694
                error!(?err, "Error evicting worker on shutdown.");
695
            }
696
        }
697
        drop(shutdown_guard);
698
0
    }
699
700
5
    async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
701
        // Check worker liveness using both the local timestamp (from LRU)
702
        // and the worker registry. A worker is alive if either source says it's alive.
703
        let timeout = Duration::from_secs(self.worker_timeout_s);
704
        let now = UNIX_EPOCH + Duration::from_secs(now_timestamp);
705
        let timeout_threshold = now_timestamp.saturating_sub(self.worker_timeout_s);
706
707
        let workers_to_check: Vec<(WorkerId, bool)> = {
708
            let inner = self.inner.lock().await;
709
            inner
710
                .workers
711
                .iter()
712
6
                .map(|(worker_id, worker)| {
713
6
                    let local_alive = worker.last_update_timestamp > timeout_threshold;
714
6
                    (worker_id.clone(), local_alive)
715
6
                })
716
                .collect()
717
        };
718
719
        let mut worker_ids_to_remove = Vec::new();
720
        for (worker_id, local_alive) in workers_to_check {
721
            if local_alive {
722
                continue;
723
            }
724
725
            let registry_alive = self
726
                .worker_registry
727
                .is_worker_alive(&worker_id, timeout, now)
728
                .await;
729
730
            if !registry_alive {
731
                trace!(
732
                    ?worker_id,
733
                    local_alive,
734
                    registry_alive,
735
                    timeout_threshold,
736
                    "Worker timed out - neither local nor registry shows alive"
737
                );
738
                worker_ids_to_remove.push(worker_id);
739
            }
740
        }
741
742
        if worker_ids_to_remove.is_empty() {
743
            return Ok(());
744
        }
745
746
        let mut inner = self.inner.lock().await;
747
        let mut result = Ok(());
748
749
        for worker_id in &worker_ids_to_remove {
750
            warn!(?worker_id, "Worker timed out, removing from pool");
751
            result = result.merge(
752
                inner
753
                    .immediate_evict_worker(
754
                        worker_id,
755
                        make_err!(
756
                            Code::Internal,
757
                            "Worker {worker_id} timed out, removing from pool"
758
                        ),
759
                        false,
760
                    )
761
                    .await,
762
            );
763
        }
764
765
        result
766
5
    }
767
768
2
    async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
769
        let mut inner = self.inner.lock().await;
770
        inner.set_drain_worker(worker_id, is_draining).await
771
2
    }
772
}
773
774
impl RootMetricsComponent for ApiWorkerScheduler {}