/build/source/nativelink-scheduler/src/api_worker_scheduler.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::ops::{Deref, DerefMut}; |
16 | | use core::sync::atomic::{AtomicU64, Ordering}; |
17 | | use core::time::Duration; |
18 | | use std::sync::Arc; |
19 | | use std::time::{Instant, UNIX_EPOCH}; |
20 | | |
21 | | use async_lock::Mutex; |
22 | | use lru::LruCache; |
23 | | use nativelink_config::schedulers::WorkerAllocationStrategy; |
24 | | use nativelink_error::{Code, Error, ResultExt, error_if, make_err, make_input_err}; |
25 | | use nativelink_metric::{ |
26 | | MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, |
27 | | RootMetricsComponent, group, |
28 | | }; |
29 | | use nativelink_util::action_messages::{OperationId, WorkerId}; |
30 | | use nativelink_util::operation_state_manager::{UpdateOperationType, WorkerStateManager}; |
31 | | use nativelink_util::platform_properties::PlatformProperties; |
32 | | use nativelink_util::shutdown_guard::ShutdownGuard; |
33 | | use tokio::sync::Notify; |
34 | | use tonic::async_trait; |
35 | | use tracing::{error, info, trace, warn}; |
36 | | |
37 | | /// Metrics for tracking scheduler performance. |
38 | | #[derive(Debug, Default)] |
39 | | pub struct SchedulerMetrics { |
40 | | /// Total number of worker additions. |
41 | | pub workers_added: AtomicU64, |
42 | | /// Total number of worker removals. |
43 | | pub workers_removed: AtomicU64, |
44 | | /// Total number of `find_worker_for_action` calls. |
45 | | pub find_worker_calls: AtomicU64, |
46 | | /// Total number of successful worker matches. |
47 | | pub find_worker_hits: AtomicU64, |
48 | | /// Total number of failed worker matches (no worker found). |
49 | | pub find_worker_misses: AtomicU64, |
50 | | /// Total time spent in `find_worker_for_action` (nanoseconds). |
51 | | pub find_worker_time_ns: AtomicU64, |
52 | | /// Total number of workers iterated during find operations. |
53 | | pub workers_iterated: AtomicU64, |
54 | | /// Total number of action dispatches. |
55 | | pub actions_dispatched: AtomicU64, |
56 | | /// Total number of keep-alive updates. |
57 | | pub keep_alive_updates: AtomicU64, |
58 | | /// Total number of worker timeouts. |
59 | | pub worker_timeouts: AtomicU64, |
60 | | } |
61 | | |
62 | | use crate::platform_property_manager::PlatformPropertyManager; |
63 | | use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp, WorkerUpdate}; |
64 | | use crate::worker_capability_index::WorkerCapabilityIndex; |
65 | | use crate::worker_registry::SharedWorkerRegistry; |
66 | | use crate::worker_scheduler::WorkerScheduler; |
67 | | |
68 | | #[derive(Debug)] |
69 | | struct Workers(LruCache<WorkerId, Worker>); |
70 | | |
71 | | impl Deref for Workers { |
72 | | type Target = LruCache<WorkerId, Worker>; |
73 | | |
74 | 168 | fn deref(&self) -> &Self::Target { |
75 | 168 | &self.0 |
76 | 168 | } |
77 | | } |
78 | | |
79 | | impl DerefMut for Workers { |
80 | 130 | fn deref_mut(&mut self) -> &mut Self::Target { |
81 | 130 | &mut self.0 |
82 | 130 | } |
83 | | } |
84 | | |
85 | | // Note: This could not be a derive macro because this derive-macro |
86 | | // does not support LruCache and nameless field structs. |
87 | | impl MetricsComponent for Workers { |
88 | 0 | fn publish( |
89 | 0 | &self, |
90 | 0 | _kind: MetricKind, |
91 | 0 | _field_metadata: MetricFieldData, |
92 | 0 | ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { |
93 | 0 | let _enter = group!("workers").entered(); |
94 | 0 | for (worker_id, worker) in self.iter() { |
95 | 0 | let _enter = group!(worker_id).entered(); |
96 | 0 | worker.publish(MetricKind::Component, MetricFieldData::default())?; |
97 | | } |
98 | 0 | Ok(MetricPublishKnownKindData::Component) |
99 | 0 | } |
100 | | } |
101 | | |
102 | | /// A collection of workers that are available to run tasks. |
103 | | #[derive(MetricsComponent)] |
104 | | struct ApiWorkerSchedulerImpl { |
105 | | /// A `LruCache` of workers available based on `allocation_strategy`. |
106 | | #[metric(group = "workers")] |
107 | | workers: Workers, |
108 | | |
109 | | /// The worker state manager. |
110 | | #[metric(group = "worker_state_manager")] |
111 | | worker_state_manager: Arc<dyn WorkerStateManager>, |
112 | | /// The allocation strategy for workers. |
113 | | allocation_strategy: WorkerAllocationStrategy, |
114 | | /// A channel to notify the matching engine that the worker pool has changed. |
115 | | worker_change_notify: Arc<Notify>, |
116 | | /// Worker registry for tracking worker liveness. |
117 | | worker_registry: SharedWorkerRegistry, |
118 | | |
119 | | /// Whether the worker scheduler is shutting down. |
120 | | shutting_down: bool, |
121 | | |
122 | | /// Index for fast worker capability lookup. |
123 | | /// Used to accelerate `find_worker_for_action` by filtering candidates |
124 | | /// based on properties before doing linear scan. |
125 | | capability_index: WorkerCapabilityIndex, |
126 | | } |
127 | | |
128 | | impl core::fmt::Debug for ApiWorkerSchedulerImpl { |
129 | 0 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
130 | 0 | f.debug_struct("ApiWorkerSchedulerImpl") |
131 | 0 | .field("workers", &self.workers) |
132 | 0 | .field("allocation_strategy", &self.allocation_strategy) |
133 | 0 | .field("worker_change_notify", &self.worker_change_notify) |
134 | 0 | .field( |
135 | 0 | "capability_index_size", |
136 | 0 | &self.capability_index.worker_count(), |
137 | 0 | ) |
138 | 0 | .field("worker_registry", &self.worker_registry) |
139 | 0 | .finish_non_exhaustive() |
140 | 0 | } |
141 | | } |
142 | | |
143 | | impl ApiWorkerSchedulerImpl { |
144 | | /// Refreshes the lifetime of the worker with the given timestamp. |
145 | | /// |
146 | | /// Instead of sending N keepalive messages (one per operation), |
147 | | /// we now send a single worker heartbeat. The worker registry tracks worker liveness, |
148 | | /// and timeout detection checks the worker's `last_seen` instead of per-operation timestamps. |
149 | | /// |
150 | | /// Note: This only updates the local worker state. The worker registry is updated |
151 | | /// separately after releasing the inner lock to reduce contention. |
152 | 2 | fn refresh_lifetime( |
153 | 2 | &mut self, |
154 | 2 | worker_id: &WorkerId, |
155 | 2 | timestamp: WorkerTimestamp, |
156 | 2 | ) -> Result<(), Error> { |
157 | 2 | let worker = self.workers.0.peek_mut(worker_id).ok_or_else(|| {0 |
158 | 0 | make_input_err!( |
159 | | "Worker not found in worker map in refresh_lifetime() {}", |
160 | | worker_id |
161 | | ) |
162 | 0 | })?; |
163 | 0 | error_if!( |
164 | 2 | worker.last_update_timestamp > timestamp, Branch (164:13): [True: 0, False: 2]
Branch (164:13): [Folded - Ignored]
|
165 | | "Worker already had a timestamp of {}, but tried to update it with {}", |
166 | | worker.last_update_timestamp, |
167 | | timestamp |
168 | | ); |
169 | 2 | worker.last_update_timestamp = timestamp; |
170 | | |
171 | 2 | trace!( |
172 | | ?worker_id, |
173 | 2 | running_operations = worker.running_action_infos.len(), |
174 | 2 | "Worker keepalive received" |
175 | | ); |
176 | | |
177 | 2 | Ok(()) |
178 | 2 | } |
179 | | |
180 | | /// Adds a worker to the pool. |
181 | | /// Note: This function will not do any task matching. |
182 | 37 | fn add_worker(&mut self, worker: Worker) -> Result<(), Error> { |
183 | 37 | let worker_id = worker.id.clone(); |
184 | 37 | let platform_properties = worker.platform_properties.clone(); |
185 | 37 | self.workers.put(worker_id.clone(), worker); |
186 | | |
187 | | // Add to capability index for fast matching |
188 | 37 | self.capability_index |
189 | 37 | .add_worker(&worker_id, &platform_properties); |
190 | | |
191 | | // Worker is not cloneable, and we do not want to send the initial connection results until |
192 | | // we have added it to the map, or we might get some strange race conditions due to the way |
193 | | // the multi-threaded runtime works. |
194 | 37 | let worker = self.workers.peek_mut(&worker_id).unwrap(); |
195 | 37 | let res = worker |
196 | 37 | .send_initial_connection_result() |
197 | 37 | .err_tip(|| "Failed to send initial connection result to worker"); |
198 | 37 | if let Err(err0 ) = &res { Branch (198:16): [True: 0, False: 37]
Branch (198:16): [Folded - Ignored]
|
199 | 0 | error!( |
200 | | ?worker_id, |
201 | | ?err, |
202 | 0 | "Worker connection appears to have been closed while adding to pool" |
203 | | ); |
204 | 37 | } |
205 | 37 | self.worker_change_notify.notify_one(); |
206 | 37 | res |
207 | 37 | } |
208 | | |
209 | | /// Removes worker from pool. |
210 | | /// Note: The caller is responsible for any rescheduling of any tasks that might be |
211 | | /// running. |
212 | 10 | fn remove_worker(&mut self, worker_id: &WorkerId) -> Option<Worker> { |
213 | | // Remove from capability index |
214 | 10 | self.capability_index.remove_worker(worker_id); |
215 | | |
216 | 10 | let result = self.workers.pop(worker_id); |
217 | 10 | self.worker_change_notify.notify_one(); |
218 | 10 | result |
219 | 10 | } |
220 | | |
221 | | /// Sets if the worker is draining or not. |
222 | 2 | async fn set_drain_worker( |
223 | 2 | &mut self, |
224 | 2 | worker_id: &WorkerId, |
225 | 2 | is_draining: bool, |
226 | 2 | ) -> Result<(), Error> { |
227 | 2 | let worker = self |
228 | 2 | .workers |
229 | 2 | .get_mut(worker_id) |
230 | 2 | .err_tip(|| format!("Worker {worker_id} doesn't exist in the pool"0 ))?0 ; |
231 | 2 | worker.is_draining = is_draining; |
232 | 2 | self.worker_change_notify.notify_one(); |
233 | 2 | Ok(()) |
234 | 2 | } |
235 | | |
236 | 57 | fn inner_find_worker_for_action( |
237 | 57 | &self, |
238 | 57 | platform_properties: &PlatformProperties, |
239 | 57 | full_worker_logging: bool, |
240 | 57 | ) -> Option<WorkerId> { |
241 | | // Do a fast check to see if any workers are available at all for work allocation |
242 | 57 | if !self.workers.iter().any(|(_, w)| w45 .can_accept_work45 ()) { Branch (242:12): [True: 14, False: 43]
Branch (242:12): [Folded - Ignored]
|
243 | 14 | if full_worker_logging { Branch (243:16): [True: 2, False: 12]
Branch (243:16): [Folded - Ignored]
|
244 | 2 | info!("All workers are fully allocated"); |
245 | 12 | } |
246 | 14 | return None; |
247 | 43 | } |
248 | | |
249 | | // Use capability index to get candidate workers that match STATIC properties |
250 | | // (Exact, Unknown) and have the required property keys (Priority, Minimum). |
251 | | // This reduces complexity from O(W × P) to O(P × log(W)) for exact properties. |
252 | 43 | let candidates = self |
253 | 43 | .capability_index |
254 | 43 | .find_matching_workers(platform_properties, full_worker_logging); |
255 | | |
256 | 43 | if candidates.is_empty() { Branch (256:12): [True: 1, False: 42]
Branch (256:12): [Folded - Ignored]
|
257 | 1 | if full_worker_logging { Branch (257:16): [True: 0, False: 1]
Branch (257:16): [Folded - Ignored]
|
258 | 0 | info!("No workers in capability index match required properties"); |
259 | 1 | } |
260 | 1 | return None; |
261 | 42 | } |
262 | | |
263 | | // Check function for availability AND dynamic Minimum property verification. |
264 | | // The index only does presence checks for Minimum properties since their |
265 | | // values change dynamically as jobs are assigned to workers. |
266 | 42 | let worker_matches = |(worker_id, w): &(&WorkerId, &Worker)| -> bool { |
267 | 42 | if !w.can_accept_work() { Branch (267:16): [True: 0, False: 42]
Branch (267:16): [Folded - Ignored]
|
268 | 0 | if full_worker_logging { Branch (268:20): [True: 0, False: 0]
Branch (268:20): [Folded - Ignored]
|
269 | 0 | info!( |
270 | 0 | "Worker {worker_id} cannot accept work: is_paused={}, is_draining={}, inflight={}/{}", |
271 | | w.is_paused, |
272 | | w.is_draining, |
273 | 0 | w.running_action_infos.len(), |
274 | | w.max_inflight_tasks |
275 | | ); |
276 | 0 | } |
277 | 0 | return false; |
278 | 42 | } |
279 | | |
280 | | // Verify Minimum properties at runtime (their values are dynamic) |
281 | 42 | if !platform_properties.is_satisfied_by(&w.platform_properties, full_worker_logging) { Branch (281:16): [True: 5, False: 37]
Branch (281:16): [Folded - Ignored]
|
282 | 5 | return false; |
283 | 37 | } |
284 | | |
285 | 37 | true |
286 | 42 | }; |
287 | | |
288 | | // Now check constraints on filtered candidates. |
289 | | // Iterate in LRU order based on allocation strategy. |
290 | 42 | let workers_iter = self.workers.iter(); |
291 | | |
292 | 42 | let worker_id = match self.allocation_strategy { |
293 | | // Use rfind to get the least recently used that satisfies the properties. |
294 | 42 | WorkerAllocationStrategy::LeastRecentlyUsed => workers_iter |
295 | 42 | .rev() |
296 | 43 | .filter42 (|(worker_id, _)| candidates.contains(worker_id)) |
297 | 42 | .find(&worker_matches) |
298 | 42 | .map(|(_, w)| w.id37 .clone37 ()), |
299 | | |
300 | | // Use find to get the most recently used that satisfies the properties. |
301 | 0 | WorkerAllocationStrategy::MostRecentlyUsed => workers_iter |
302 | 0 | .filter(|(worker_id, _)| candidates.contains(worker_id)) |
303 | 0 | .find(&worker_matches) |
304 | 0 | .map(|(_, w)| w.id.clone()), |
305 | | }; |
306 | 42 | if full_worker_logging && worker_id8 .is_none8 () { Branch (306:12): [True: 8, False: 34]
Branch (306:35): [True: 1, False: 7]
Branch (306:12): [Folded - Ignored]
Branch (306:35): [Folded - Ignored]
|
307 | 1 | warn!("No workers matched!"); |
308 | 41 | } |
309 | 42 | worker_id |
310 | 57 | } |
311 | | |
312 | 10 | async fn update_action( |
313 | 10 | &mut self, |
314 | 10 | worker_id: &WorkerId, |
315 | 10 | operation_id: &OperationId, |
316 | 10 | update: UpdateOperationType, |
317 | 10 | ) -> Result<(), Error> { |
318 | 10 | let worker = self.workers.get_mut(worker_id).err_tip(|| {0 |
319 | 0 | format!("Worker {worker_id} does not exist in SimpleScheduler::update_action") |
320 | 0 | })?; |
321 | | |
322 | | // Ensure the worker is supposed to be running the operation. |
323 | 10 | if !worker.running_action_infos.contains_key(operation_id) { Branch (323:12): [True: 1, False: 9]
Branch (323:12): [Folded - Ignored]
|
324 | 1 | let err = make_err!( |
325 | 1 | Code::Internal, |
326 | | "Operation {operation_id} should not be running on worker {worker_id} in SimpleScheduler::update_action" |
327 | | ); |
328 | 1 | return Result::<(), _>::Err(err.clone()) |
329 | 1 | .merge(self.immediate_evict_worker(worker_id, err, false).await); |
330 | 9 | } |
331 | | |
332 | 9 | let (is_finished, due_to_backpressure) = match &update { |
333 | 6 | UpdateOperationType::UpdateWithActionStage(action_stage) => { |
334 | 6 | (action_stage.is_finished(), false) |
335 | | } |
336 | 0 | UpdateOperationType::KeepAlive => (false, false), |
337 | 3 | UpdateOperationType::UpdateWithError(err) => { |
338 | 3 | (true, err.code == Code::ResourceExhausted) |
339 | | } |
340 | 0 | UpdateOperationType::UpdateWithDisconnect => (true, false), |
341 | | UpdateOperationType::ExecutionComplete => { |
342 | | // No update here, just restoring platform properties. |
343 | 0 | worker.execution_complete(operation_id); |
344 | 0 | self.worker_change_notify.notify_one(); |
345 | 0 | return Ok(()); |
346 | | } |
347 | | }; |
348 | | |
349 | | // Update the operation in the worker state manager. |
350 | | { |
351 | 9 | let update_operation_res8 = self |
352 | 9 | .worker_state_manager |
353 | 9 | .update_operation(operation_id, worker_id, update) |
354 | 9 | .await |
355 | 8 | .err_tip(|| "in update_operation on SimpleScheduler::update_action"); |
356 | 8 | if let Err(err0 ) = update_operation_res { Branch (356:20): [True: 0, False: 8]
Branch (356:20): [Folded - Ignored]
|
357 | 0 | error!( |
358 | | %operation_id, |
359 | | ?worker_id, |
360 | | ?err, |
361 | 0 | "Failed to update_operation on update_action" |
362 | | ); |
363 | 0 | return Err(err); |
364 | 8 | } |
365 | | } |
366 | | |
367 | 8 | if !is_finished { Branch (367:12): [True: 0, False: 8]
Branch (367:12): [Folded - Ignored]
|
368 | 0 | return Ok(()); |
369 | 8 | } |
370 | | |
371 | | // Clear this action from the current worker if finished. |
372 | 8 | let complete_action_res = { |
373 | | // Note: We need to run this before dealing with backpressure logic. |
374 | 8 | let complete_action_res = worker.complete_action(operation_id).await; |
375 | | |
376 | 8 | if (due_to_backpressure || !worker.can_accept_work()) && worker0 .has_actions0 () { Branch (376:17): [True: 0, False: 8]
Branch (376:40): [True: 0, False: 8]
Branch (376:70): [True: 0, False: 0]
Branch (376:17): [Folded - Ignored]
Branch (376:40): [Folded - Ignored]
Branch (376:70): [Folded - Ignored]
|
377 | 0 | worker.is_paused = true; |
378 | 8 | } |
379 | 8 | complete_action_res |
380 | | }; |
381 | | |
382 | 8 | self.worker_change_notify.notify_one(); |
383 | | |
384 | 8 | complete_action_res |
385 | 9 | } |
386 | | |
387 | | /// Notifies the specified worker to run the given action and handles errors by evicting |
388 | | /// the worker if the notification fails. |
389 | 33 | async fn worker_notify_run_action( |
390 | 33 | &mut self, |
391 | 33 | worker_id: WorkerId, |
392 | 33 | operation_id: OperationId, |
393 | 33 | action_info: ActionInfoWithProps, |
394 | 33 | ) -> Result<(), Error> { |
395 | 33 | if let Some(worker) = self.workers.get_mut(&worker_id) { Branch (395:16): [True: 0, False: 0]
Branch (395:16): [Folded - Ignored]
Branch (395:16): [True: 4, False: 0]
Branch (395:16): [True: 27, False: 0]
Branch (395:16): [True: 2, False: 0]
|
396 | 33 | let notify_worker_result = worker |
397 | 33 | .notify_update(WorkerUpdate::RunAction((operation_id, action_info.clone()))) |
398 | 33 | .await; |
399 | | |
400 | 33 | if let Err(notify_worker_result1 ) = notify_worker_result { Branch (400:20): [True: 0, False: 0]
Branch (400:20): [Folded - Ignored]
Branch (400:20): [True: 0, False: 4]
Branch (400:20): [True: 1, False: 26]
Branch (400:20): [True: 0, False: 2]
|
401 | 1 | warn!( |
402 | | ?worker_id, |
403 | | ?action_info, |
404 | | ?notify_worker_result, |
405 | 1 | "Worker command failed, removing worker", |
406 | | ); |
407 | | |
408 | | // A slightly nasty way of figuring out that the worker disconnected |
409 | | // from send_msg_to_worker without introducing complexity to the |
410 | | // code path from here to there. |
411 | 1 | let is_disconnect = notify_worker_result.code == Code::Internal Branch (411:37): [True: 0, False: 0]
Branch (411:37): [Folded - Ignored]
Branch (411:37): [True: 0, False: 0]
Branch (411:37): [True: 1, False: 0]
Branch (411:37): [True: 0, False: 0]
|
412 | 1 | && notify_worker_result.messages.len() == 1 Branch (412:24): [True: 0, False: 0]
Branch (412:24): [Folded - Ignored]
Branch (412:24): [True: 0, False: 0]
Branch (412:24): [True: 1, False: 0]
Branch (412:24): [True: 0, False: 0]
|
413 | 1 | && notify_worker_result.messages[0] == "Worker Disconnected"; |
414 | | |
415 | 1 | let err = make_err!( |
416 | 1 | Code::Internal, |
417 | | "Worker command failed, removing worker {worker_id} -- {notify_worker_result:?}", |
418 | | ); |
419 | | |
420 | 1 | return Result::<(), _>::Err(err.clone()).merge( |
421 | 1 | self.immediate_evict_worker(&worker_id, err, is_disconnect) |
422 | 1 | .await, |
423 | | ); |
424 | 32 | } |
425 | 32 | Ok(()) |
426 | | } else { |
427 | 0 | warn!( |
428 | | ?worker_id, |
429 | | %operation_id, |
430 | | ?action_info, |
431 | 0 | "Worker not found in worker map in worker_notify_run_action" |
432 | | ); |
433 | | // Ensure the operation is put back to queued state. |
434 | 0 | self.worker_state_manager |
435 | 0 | .update_operation( |
436 | 0 | &operation_id, |
437 | 0 | &worker_id, |
438 | 0 | UpdateOperationType::UpdateWithDisconnect, |
439 | 0 | ) |
440 | 0 | .await |
441 | | } |
442 | 33 | } |
443 | | |
444 | | /// Evicts the worker from the pool and puts items back into the queue if anything was being executed on it. |
445 | 10 | async fn immediate_evict_worker( |
446 | 10 | &mut self, |
447 | 10 | worker_id: &WorkerId, |
448 | 10 | err: Error, |
449 | 10 | is_disconnect: bool, |
450 | 10 | ) -> Result<(), Error> { |
451 | 10 | let mut result = Ok(()); |
452 | 10 | if let Some(mut worker) = self.remove_worker(worker_id) { Branch (452:16): [True: 9, False: 0]
Branch (452:16): [Folded - Ignored]
Branch (452:16): [True: 0, False: 0]
Branch (452:16): [True: 1, False: 0]
Branch (452:16): [True: 0, False: 0]
|
453 | | // We don't care if we fail to send message to worker, this is only a best attempt. |
454 | 10 | drop(worker.notify_update(WorkerUpdate::Disconnect).await); |
455 | 10 | let update = if is_disconnect { Branch (455:29): [True: 0, False: 9]
Branch (455:29): [Folded - Ignored]
Branch (455:29): [True: 0, False: 0]
Branch (455:29): [True: 0, False: 1]
Branch (455:29): [True: 0, False: 0]
|
456 | 0 | UpdateOperationType::UpdateWithDisconnect |
457 | | } else { |
458 | 10 | UpdateOperationType::UpdateWithError(err) |
459 | | }; |
460 | 10 | for (operation_id8 , _) in worker.running_action_infos.drain() { |
461 | 8 | result = result.merge( |
462 | 8 | self.worker_state_manager |
463 | 8 | .update_operation(&operation_id, worker_id, update.clone()) |
464 | 8 | .await, |
465 | | ); |
466 | | } |
467 | 0 | } |
468 | | // Note: Calling this many time is very cheap, it'll only trigger `do_try_match` once. |
469 | | // TODO(palfrey) This should be moved to inside the Workers struct. |
470 | 10 | self.worker_change_notify.notify_one(); |
471 | 10 | result |
472 | 10 | } |
473 | | } |
474 | | |
475 | | #[derive(Debug, MetricsComponent)] |
476 | | pub struct ApiWorkerScheduler { |
477 | | #[metric] |
478 | | inner: Mutex<ApiWorkerSchedulerImpl>, |
479 | | #[metric(group = "platform_property_manager")] |
480 | | platform_property_manager: Arc<PlatformPropertyManager>, |
481 | | |
482 | | #[metric( |
483 | | help = "Timeout of how long to evict workers if no response in this given amount of time in seconds." |
484 | | )] |
485 | | worker_timeout_s: u64, |
486 | | /// Shared worker registry for checking worker liveness. |
487 | | worker_registry: SharedWorkerRegistry, |
488 | | |
489 | | /// Performance metrics for observability. |
490 | | metrics: Arc<SchedulerMetrics>, |
491 | | } |
492 | | |
493 | | impl ApiWorkerScheduler { |
494 | 32 | pub fn new( |
495 | 32 | worker_state_manager: Arc<dyn WorkerStateManager>, |
496 | 32 | platform_property_manager: Arc<PlatformPropertyManager>, |
497 | 32 | allocation_strategy: WorkerAllocationStrategy, |
498 | 32 | worker_change_notify: Arc<Notify>, |
499 | 32 | worker_timeout_s: u64, |
500 | 32 | worker_registry: SharedWorkerRegistry, |
501 | 32 | ) -> Arc<Self> { |
502 | 32 | Arc::new(Self { |
503 | 32 | inner: Mutex::new(ApiWorkerSchedulerImpl { |
504 | 32 | workers: Workers(LruCache::unbounded()), |
505 | 32 | worker_state_manager, |
506 | 32 | allocation_strategy, |
507 | 32 | worker_change_notify, |
508 | 32 | worker_registry: worker_registry.clone(), |
509 | 32 | shutting_down: false, |
510 | 32 | capability_index: WorkerCapabilityIndex::new(), |
511 | 32 | }), |
512 | 32 | platform_property_manager, |
513 | 32 | worker_timeout_s, |
514 | 32 | worker_registry, |
515 | 32 | metrics: Arc::new(SchedulerMetrics::default()), |
516 | 32 | }) |
517 | 32 | } |
518 | | |
519 | | /// Returns a reference to the worker registry. |
520 | 0 | pub const fn worker_registry(&self) -> &SharedWorkerRegistry { |
521 | 0 | &self.worker_registry |
522 | 0 | } |
523 | | |
524 | 33 | pub async fn worker_notify_run_action( |
525 | 33 | &self, |
526 | 33 | worker_id: WorkerId, |
527 | 33 | operation_id: OperationId, |
528 | 33 | action_info: ActionInfoWithProps, |
529 | 33 | ) -> Result<(), Error> { |
530 | 33 | self.metrics |
531 | 33 | .actions_dispatched |
532 | 33 | .fetch_add(1, Ordering::Relaxed); |
533 | 33 | let mut inner = self.inner.lock().await; |
534 | 33 | inner |
535 | 33 | .worker_notify_run_action(worker_id, operation_id, action_info) |
536 | 33 | .await |
537 | 33 | } |
538 | | |
539 | | /// Returns the scheduler metrics for observability. |
540 | | #[must_use] |
541 | 0 | pub const fn get_metrics(&self) -> &Arc<SchedulerMetrics> { |
542 | 0 | &self.metrics |
543 | 0 | } |
544 | | |
545 | | /// Attempts to find a worker that is capable of running this action. |
546 | | // TODO(palfrey) This algorithm is not very efficient. Simple testing using a tree-like |
547 | | // structure showed worse performance on a 10_000 worker * 7 properties * 1000 queued tasks |
548 | | // simulation of worst cases in a single threaded environment. |
549 | 57 | pub async fn find_worker_for_action( |
550 | 57 | &self, |
551 | 57 | platform_properties: &PlatformProperties, |
552 | 57 | full_worker_logging: bool, |
553 | 57 | ) -> Option<WorkerId> { |
554 | 57 | let start = Instant::now(); |
555 | 57 | self.metrics |
556 | 57 | .find_worker_calls |
557 | 57 | .fetch_add(1, Ordering::Relaxed); |
558 | | |
559 | 57 | let inner = self.inner.lock().await; |
560 | 57 | let worker_count = inner.workers.len() as u64; |
561 | 57 | let result = inner.inner_find_worker_for_action(platform_properties, full_worker_logging); |
562 | | |
563 | | // Track workers iterated (worst case is all workers) |
564 | 57 | self.metrics |
565 | 57 | .workers_iterated |
566 | 57 | .fetch_add(worker_count, Ordering::Relaxed); |
567 | | |
568 | 57 | if result.is_some() { Branch (568:12): [True: 0, False: 0]
Branch (568:12): [Folded - Ignored]
Branch (568:12): [True: 8, False: 0]
Branch (568:12): [True: 28, False: 19]
Branch (568:12): [True: 1, False: 1]
|
569 | 37 | self.metrics |
570 | 37 | .find_worker_hits |
571 | 37 | .fetch_add(1, Ordering::Relaxed); |
572 | 37 | } else { |
573 | 20 | self.metrics |
574 | 20 | .find_worker_misses |
575 | 20 | .fetch_add(1, Ordering::Relaxed); |
576 | 20 | } |
577 | | |
578 | | #[allow(clippy::cast_possible_truncation)] |
579 | 57 | self.metrics |
580 | 57 | .find_worker_time_ns |
581 | 57 | .fetch_add(start.elapsed().as_nanos() as u64, Ordering::Relaxed); |
582 | 57 | result |
583 | 57 | } |
584 | | |
585 | | /// Checks to see if the worker exists in the worker pool. Should only be used in unit tests. |
586 | | #[must_use] |
587 | 7 | pub async fn contains_worker_for_test(&self, worker_id: &WorkerId) -> bool { |
588 | 7 | let inner = self.inner.lock().await; |
589 | 7 | inner.workers.contains(worker_id) |
590 | 7 | } |
591 | | |
592 | | /// A unit test function used to send the keep alive message to the worker from the server. |
593 | 0 | pub async fn send_keep_alive_to_worker_for_test( |
594 | 0 | &self, |
595 | 0 | worker_id: &WorkerId, |
596 | 1 | ) -> Result<(), Error> { |
597 | 1 | let mut inner = self.inner.lock().await; |
598 | 1 | let worker = inner.workers.get_mut(worker_id).ok_or_else(|| {0 |
599 | 0 | make_input_err!("WorkerId '{}' does not exist in workers map", worker_id) |
600 | 0 | })?; |
601 | 1 | worker.keep_alive() |
602 | 1 | } |
603 | | } |
604 | | |
605 | | #[async_trait] |
606 | | impl WorkerScheduler for ApiWorkerScheduler { |
607 | 2 | fn get_platform_property_manager(&self) -> &PlatformPropertyManager { |
608 | 2 | self.platform_property_manager.as_ref() |
609 | 2 | } |
610 | | |
611 | 37 | async fn add_worker(&self, worker: Worker) -> Result<(), Error> { |
612 | | let worker_id = worker.id.clone(); |
613 | | let worker_timestamp = worker.last_update_timestamp; |
614 | | let mut inner = self.inner.lock().await; |
615 | | if inner.shutting_down { |
616 | | warn!("Rejected worker add during shutdown: {}", worker_id); |
617 | | return Err(make_err!( |
618 | | Code::Unavailable, |
619 | | "Received request to add worker while shutting down" |
620 | | )); |
621 | | } |
622 | | let result = inner |
623 | | .add_worker(worker) |
624 | | .err_tip(|| "Error while adding worker, removing from pool"); |
625 | | if let Err(err) = result { |
626 | | return Result::<(), _>::Err(err.clone()) |
627 | | .merge(inner.immediate_evict_worker(&worker_id, err, false).await); |
628 | | } |
629 | | |
630 | | let now = UNIX_EPOCH + Duration::from_secs(worker_timestamp); |
631 | | self.worker_registry.register_worker(&worker_id, now).await; |
632 | | |
633 | | self.metrics.workers_added.fetch_add(1, Ordering::Relaxed); |
634 | | Ok(()) |
635 | 37 | } |
636 | | |
637 | | async fn update_action( |
638 | | &self, |
639 | | worker_id: &WorkerId, |
640 | | operation_id: &OperationId, |
641 | | update: UpdateOperationType, |
642 | 10 | ) -> Result<(), Error> { |
643 | | let mut inner = self.inner.lock().await; |
644 | | inner.update_action(worker_id, operation_id, update).await |
645 | 10 | } |
646 | | |
647 | | async fn worker_keep_alive_received( |
648 | | &self, |
649 | | worker_id: &WorkerId, |
650 | | timestamp: WorkerTimestamp, |
651 | 2 | ) -> Result<(), Error> { |
652 | | { |
653 | | let mut inner = self.inner.lock().await; |
654 | | inner |
655 | | .refresh_lifetime(worker_id, timestamp) |
656 | | .err_tip(|| "Error refreshing lifetime in worker_keep_alive_received()")?; |
657 | | } |
658 | | let now = UNIX_EPOCH + Duration::from_secs(timestamp); |
659 | | self.worker_registry |
660 | | .update_worker_heartbeat(worker_id, now) |
661 | | .await; |
662 | | Ok(()) |
663 | 2 | } |
664 | | |
665 | 6 | async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> { |
666 | | self.worker_registry.remove_worker(worker_id).await; |
667 | | |
668 | | let mut inner = self.inner.lock().await; |
669 | | inner |
670 | | .immediate_evict_worker( |
671 | | worker_id, |
672 | | make_err!(Code::Internal, "Received request to remove worker"), |
673 | | false, |
674 | | ) |
675 | | .await |
676 | 6 | } |
677 | | |
678 | 0 | async fn shutdown(&self, shutdown_guard: ShutdownGuard) { |
679 | | let mut inner = self.inner.lock().await; |
680 | | inner.shutting_down = true; // should reject further worker registration |
681 | | while let Some(worker_id) = inner |
682 | | .workers |
683 | | .peek_lru() |
684 | 0 | .map(|(worker_id, _worker)| worker_id.clone()) |
685 | | { |
686 | | if let Err(err) = inner |
687 | | .immediate_evict_worker( |
688 | | &worker_id, |
689 | | make_err!(Code::Internal, "Scheduler shutdown"), |
690 | | true, |
691 | | ) |
692 | | .await |
693 | | { |
694 | | error!(?err, "Error evicting worker on shutdown."); |
695 | | } |
696 | | } |
697 | | drop(shutdown_guard); |
698 | 0 | } |
699 | | |
700 | 5 | async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> { |
701 | | // Check worker liveness using both the local timestamp (from LRU) |
702 | | // and the worker registry. A worker is alive if either source says it's alive. |
703 | | let timeout = Duration::from_secs(self.worker_timeout_s); |
704 | | let now = UNIX_EPOCH + Duration::from_secs(now_timestamp); |
705 | | let timeout_threshold = now_timestamp.saturating_sub(self.worker_timeout_s); |
706 | | |
707 | | let workers_to_check: Vec<(WorkerId, bool)> = { |
708 | | let inner = self.inner.lock().await; |
709 | | inner |
710 | | .workers |
711 | | .iter() |
712 | 6 | .map(|(worker_id, worker)| { |
713 | 6 | let local_alive = worker.last_update_timestamp > timeout_threshold; |
714 | 6 | (worker_id.clone(), local_alive) |
715 | 6 | }) |
716 | | .collect() |
717 | | }; |
718 | | |
719 | | let mut worker_ids_to_remove = Vec::new(); |
720 | | for (worker_id, local_alive) in workers_to_check { |
721 | | if local_alive { |
722 | | continue; |
723 | | } |
724 | | |
725 | | let registry_alive = self |
726 | | .worker_registry |
727 | | .is_worker_alive(&worker_id, timeout, now) |
728 | | .await; |
729 | | |
730 | | if !registry_alive { |
731 | | trace!( |
732 | | ?worker_id, |
733 | | local_alive, |
734 | | registry_alive, |
735 | | timeout_threshold, |
736 | | "Worker timed out - neither local nor registry shows alive" |
737 | | ); |
738 | | worker_ids_to_remove.push(worker_id); |
739 | | } |
740 | | } |
741 | | |
742 | | if worker_ids_to_remove.is_empty() { |
743 | | return Ok(()); |
744 | | } |
745 | | |
746 | | let mut inner = self.inner.lock().await; |
747 | | let mut result = Ok(()); |
748 | | |
749 | | for worker_id in &worker_ids_to_remove { |
750 | | warn!(?worker_id, "Worker timed out, removing from pool"); |
751 | | result = result.merge( |
752 | | inner |
753 | | .immediate_evict_worker( |
754 | | worker_id, |
755 | | make_err!( |
756 | | Code::Internal, |
757 | | "Worker {worker_id} timed out, removing from pool" |
758 | | ), |
759 | | false, |
760 | | ) |
761 | | .await, |
762 | | ); |
763 | | } |
764 | | |
765 | | result |
766 | 5 | } |
767 | | |
768 | 2 | async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> { |
769 | | let mut inner = self.inner.lock().await; |
770 | | inner.set_drain_worker(worker_id, is_draining).await |
771 | 2 | } |
772 | | } |
773 | | |
774 | | impl RootMetricsComponent for ApiWorkerScheduler {} |