/build/source/nativelink-scheduler/src/api_worker_scheduler.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::ops::{Deref, DerefMut}; |
16 | | use std::collections::{HashMap, HashSet}; |
17 | | use std::sync::Arc; |
18 | | |
19 | | use async_lock::Mutex; |
20 | | use lru::LruCache; |
21 | | use nativelink_config::schedulers::WorkerAllocationStrategy; |
22 | | use nativelink_error::{Code, Error, ResultExt, error_if, make_err, make_input_err}; |
23 | | use nativelink_metric::{ |
24 | | MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, |
25 | | RootMetricsComponent, group, |
26 | | }; |
27 | | use nativelink_util::action_messages::{OperationId, WorkerId}; |
28 | | use nativelink_util::operation_state_manager::{UpdateOperationType, WorkerStateManager}; |
29 | | use nativelink_util::platform_properties::PlatformProperties; |
30 | | use nativelink_util::shutdown_guard::ShutdownGuard; |
31 | | use nativelink_util::spawn; |
32 | | use nativelink_util::task::JoinHandleDropGuard; |
33 | | use tokio::sync::Notify; |
34 | | use tokio::sync::mpsc::{self, UnboundedSender}; |
35 | | use tonic::async_trait; |
36 | | #[cfg(feature = "worker_find_logging")] |
37 | | use tracing::info; |
38 | | use tracing::{error, warn}; |
39 | | |
40 | | use crate::platform_property_manager::PlatformPropertyManager; |
41 | | use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp, WorkerUpdate}; |
42 | | use crate::worker_scheduler::WorkerScheduler; |
43 | | |
44 | | #[derive(Debug)] |
45 | | struct Workers(LruCache<WorkerId, Worker>); |
46 | | |
47 | | impl Deref for Workers { |
48 | | type Target = LruCache<WorkerId, Worker>; |
49 | | |
50 | 62 | fn deref(&self) -> &Self::Target { |
51 | 62 | &self.0 |
52 | 62 | } |
53 | | } |
54 | | |
55 | | impl DerefMut for Workers { |
56 | 124 | fn deref_mut(&mut self) -> &mut Self::Target { |
57 | 124 | &mut self.0 |
58 | 124 | } |
59 | | } |
60 | | |
61 | | // Note: This could not be a derive macro because this derive-macro |
62 | | // does not support LruCache and nameless field structs. |
63 | | impl MetricsComponent for Workers { |
64 | 0 | fn publish( |
65 | 0 | &self, |
66 | 0 | _kind: MetricKind, |
67 | 0 | _field_metadata: MetricFieldData, |
68 | 0 | ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { |
69 | 0 | let _enter = group!("workers").entered(); |
70 | 0 | for (worker_id, worker) in self.iter() { |
71 | 0 | let _enter = group!(worker_id).entered(); |
72 | 0 | worker.publish(MetricKind::Component, MetricFieldData::default())?; |
73 | | } |
74 | 0 | Ok(MetricPublishKnownKindData::Component) |
75 | 0 | } |
76 | | } |
77 | | |
78 | | /// A collection of workers that are available to run tasks. |
79 | | #[derive(MetricsComponent)] |
80 | | struct ApiWorkerSchedulerImpl { |
81 | | /// A `LruCache` of workers available based on `allocation_strategy`. |
82 | | #[metric(group = "workers")] |
83 | | workers: Workers, |
84 | | |
85 | | /// A set of operations that have notified completion, but are still |
86 | | /// uploading results. |
87 | | pending_results: HashMap<WorkerId, HashSet<OperationId>>, |
88 | | |
89 | | /// The worker state manager. |
90 | | #[metric(group = "worker_state_manager")] |
91 | | worker_state_manager: Arc<dyn WorkerStateManager>, |
92 | | /// The allocation strategy for workers. |
93 | | allocation_strategy: WorkerAllocationStrategy, |
94 | | /// A channel to notify the matching engine that the worker pool has changed. |
95 | | worker_change_notify: Arc<Notify>, |
96 | | /// A channel to notify that an operation is still alive. |
97 | | operation_keep_alive_tx: UnboundedSender<(OperationId, WorkerId)>, |
98 | | } |
99 | | |
100 | | impl core::fmt::Debug for ApiWorkerSchedulerImpl { |
101 | 0 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
102 | 0 | f.debug_struct("ApiWorkerSchedulerImpl") |
103 | 0 | .field("workers", &self.workers) |
104 | 0 | .field("allocation_strategy", &self.allocation_strategy) |
105 | 0 | .field("worker_change_notify", &self.worker_change_notify) |
106 | 0 | .field("operation_keep_alive_tx", &self.operation_keep_alive_tx) |
107 | 0 | .finish_non_exhaustive() |
108 | 0 | } |
109 | | } |
110 | | |
111 | | impl ApiWorkerSchedulerImpl { |
112 | | /// Refreshes the lifetime of the worker with the given timestamp. |
113 | 2 | fn refresh_lifetime( |
114 | 2 | &mut self, |
115 | 2 | worker_id: &WorkerId, |
116 | 2 | timestamp: WorkerTimestamp, |
117 | 2 | ) -> Result<(), Error> { |
118 | 2 | let worker = self.workers.0.peek_mut(worker_id).ok_or_else(|| {0 |
119 | 0 | make_input_err!( |
120 | | "Worker not found in worker map in refresh_lifetime() {}", |
121 | | worker_id |
122 | | ) |
123 | 0 | })?; |
124 | 0 | error_if!( |
125 | 2 | worker.last_update_timestamp > timestamp, Branch (125:13): [True: 0, False: 2]
Branch (125:13): [Folded - Ignored]
|
126 | | "Worker already had a timestamp of {}, but tried to update it with {}", |
127 | | worker.last_update_timestamp, |
128 | | timestamp |
129 | | ); |
130 | 2 | worker.last_update_timestamp = timestamp; |
131 | 2 | for operation_id0 in worker.running_action_infos.keys() { |
132 | 0 | if self Branch (132:16): [True: 0, False: 0]
Branch (132:16): [Folded - Ignored]
|
133 | 0 | .operation_keep_alive_tx |
134 | 0 | .send((operation_id.clone(), worker_id.clone())) |
135 | 0 | .is_err() |
136 | | { |
137 | 0 | error!( |
138 | | ?operation_id, |
139 | | ?worker_id, |
140 | 0 | "OperationKeepAliveTx stream closed" |
141 | | ); |
142 | 0 | } |
143 | | } |
144 | 2 | Ok(()) |
145 | 2 | } |
146 | | |
147 | | /// Adds a worker to the pool. |
148 | | /// Note: This function will not do any task matching. |
149 | 35 | fn add_worker(&mut self, worker: Worker) -> Result<(), Error> { |
150 | 35 | let worker_id = worker.id.clone(); |
151 | 35 | self.workers.put(worker_id.clone(), worker); |
152 | | |
153 | | // Worker is not cloneable, and we do not want to send the initial connection results until |
154 | | // we have added it to the map, or we might get some strange race conditions due to the way |
155 | | // the multi-threaded runtime works. |
156 | 35 | let worker = self.workers.peek_mut(&worker_id).unwrap(); |
157 | 35 | let res = worker |
158 | 35 | .send_initial_connection_result() |
159 | 35 | .err_tip(|| "Failed to send initial connection result to worker"); |
160 | 35 | if let Err(err0 ) = &res { Branch (160:16): [True: 0, False: 35]
Branch (160:16): [Folded - Ignored]
|
161 | 0 | error!( |
162 | | ?worker_id, |
163 | | ?err, |
164 | 0 | "Worker connection appears to have been closed while adding to pool" |
165 | | ); |
166 | 35 | } |
167 | 35 | self.worker_change_notify.notify_one(); |
168 | 35 | res |
169 | 35 | } |
170 | | |
171 | | /// Removes worker from pool. |
172 | | /// Note: The caller is responsible for any rescheduling of any tasks that might be |
173 | | /// running. |
174 | 9 | fn remove_worker(&mut self, worker_id: &WorkerId) -> Option<Worker> { |
175 | 9 | let result = self.workers.pop(worker_id); |
176 | 9 | self.pending_results.remove(worker_id); |
177 | 9 | self.worker_change_notify.notify_one(); |
178 | 9 | result |
179 | 9 | } |
180 | | |
181 | | /// Sets if the worker is draining or not. |
182 | 2 | async fn set_drain_worker( |
183 | 2 | &mut self, |
184 | 2 | worker_id: &WorkerId, |
185 | 2 | is_draining: bool, |
186 | 2 | ) -> Result<(), Error> { |
187 | 2 | let worker = self |
188 | 2 | .workers |
189 | 2 | .get_mut(worker_id) |
190 | 2 | .err_tip(|| format!("Worker {worker_id} doesn't exist in the pool"0 ))?0 ; |
191 | 2 | worker.is_draining = is_draining; |
192 | 2 | self.worker_change_notify.notify_one(); |
193 | 2 | Ok(()) |
194 | 2 | } |
195 | | |
196 | | #[cfg_attr(not(feature = "worker_find_logging"), allow(unused_variables))] |
197 | 38 | fn inner_worker_checker( |
198 | 38 | (worker_id, w): &(&WorkerId, &Worker), |
199 | 38 | platform_properties: &PlatformProperties, |
200 | 38 | ) -> bool { |
201 | | #[cfg(feature = "worker_find_logging")] |
202 | | { |
203 | | if !w.can_accept_work() { |
204 | | info!( |
205 | | "Worker {worker_id} cannot accept work because is_paused: {}, is_draining: {}", |
206 | | w.is_paused, w.is_draining |
207 | | ); |
208 | | return false; |
209 | | } |
210 | | if !platform_properties.is_satisfied_by(&w.platform_properties) { |
211 | | info!("Worker {worker_id} properties are insufficient"); |
212 | | return false; |
213 | | } |
214 | | return true; |
215 | | } |
216 | | #[cfg(not(feature = "worker_find_logging"))] |
217 | | { |
218 | 38 | w.can_accept_work() && platform_properties37 .is_satisfied_by37 (&w.platform_properties37 ) Branch (218:13): [True: 37, False: 1]
Branch (218:13): [Folded - Ignored]
|
219 | | } |
220 | 38 | } |
221 | | |
222 | 50 | fn inner_find_worker_for_action( |
223 | 50 | &self, |
224 | 50 | platform_properties: &PlatformProperties, |
225 | 50 | ) -> Option<WorkerId> { |
226 | 50 | let mut workers_iter = self.workers.iter(); |
227 | 50 | let workers_iter = |
228 | 50 | match self.allocation_strategy { |
229 | | // Use rfind to get the least recently used that satisfies the properties. |
230 | 50 | WorkerAllocationStrategy::LeastRecentlyUsed => workers_iter |
231 | 50 | .rfind(|worker| Self::inner_worker_checker38 (worker38 , platform_properties38 )), |
232 | | // Use find to get the most recently used that satisfies the properties. |
233 | 0 | WorkerAllocationStrategy::MostRecentlyUsed => workers_iter |
234 | 0 | .find(|worker| Self::inner_worker_checker(worker, platform_properties)), |
235 | | }; |
236 | 50 | workers_iter.map(|(_, w)| w.id32 .clone32 ()) |
237 | 50 | } |
238 | | |
239 | 10 | async fn update_action( |
240 | 10 | &mut self, |
241 | 10 | worker_id: &WorkerId, |
242 | 10 | operation_id: &OperationId, |
243 | 10 | update: UpdateOperationType, |
244 | 10 | ) -> Result<(), Error> { |
245 | 10 | let worker = self.workers.get_mut(worker_id).err_tip(|| {0 |
246 | 0 | format!("Worker {worker_id} does not exist in SimpleScheduler::update_action") |
247 | 0 | })?; |
248 | | |
249 | | // Ensure the worker is supposed to be running the operation. |
250 | 10 | let pending_completion = !worker.running_action_infos.contains_key(operation_id); |
251 | 10 | if pending_completion Branch (251:12): [True: 1, False: 9]
Branch (251:12): [Folded - Ignored]
|
252 | 1 | && !self Branch (252:16): [True: 1, False: 0]
Branch (252:16): [Folded - Ignored]
|
253 | 1 | .pending_results |
254 | 1 | .get(worker_id) |
255 | 1 | .is_some_and(|pending_operations| pending_operations0 .contains0 (operation_id0 )) |
256 | | { |
257 | 1 | return Err(make_err!( |
258 | 1 | Code::Internal, |
259 | 1 | "Operation {operation_id} should not be running on worker {worker_id} in SimpleScheduler::update_action" |
260 | 1 | )); |
261 | 9 | } |
262 | | |
263 | 9 | let (is_finished, due_to_backpressure) = match &update { |
264 | 6 | UpdateOperationType::UpdateWithActionStage(action_stage) => { |
265 | 6 | (action_stage.is_finished(), false) |
266 | | } |
267 | 0 | UpdateOperationType::KeepAlive => (false, false), |
268 | 3 | UpdateOperationType::UpdateWithError(err) => { |
269 | 3 | (true, err.code == Code::ResourceExhausted) |
270 | | } |
271 | 0 | UpdateOperationType::UpdateWithDisconnect => (true, false), |
272 | | }; |
273 | | |
274 | | // Update the operation in the worker state manager. |
275 | | { |
276 | 9 | let update_operation_res = self |
277 | 9 | .worker_state_manager |
278 | 9 | .update_operation(operation_id, worker_id, update) |
279 | 9 | .await |
280 | 9 | .err_tip(|| "in update_operation on SimpleScheduler::update_action"); |
281 | 9 | if let Err(err0 ) = update_operation_res { Branch (281:20): [True: 0, False: 9]
Branch (281:20): [Folded - Ignored]
|
282 | 0 | error!( |
283 | | ?operation_id, |
284 | | ?worker_id, |
285 | | ?err, |
286 | 0 | "Failed to update_operation on update_action" |
287 | | ); |
288 | 0 | return Err(err); |
289 | 9 | } |
290 | | } |
291 | | |
292 | 9 | if !is_finished { Branch (292:12): [True: 0, False: 9]
Branch (292:12): [Folded - Ignored]
|
293 | 0 | return Ok(()); |
294 | 9 | } |
295 | | |
296 | 9 | if pending_completion { Branch (296:12): [True: 0, False: 9]
Branch (296:12): [Folded - Ignored]
|
297 | | // This is absolutely always true, but pattern match anyway. |
298 | 0 | if let Some(pending_operations) = self.pending_results.get_mut(worker_id) { Branch (298:20): [True: 0, False: 0]
Branch (298:20): [Folded - Ignored]
|
299 | 0 | pending_operations.remove(operation_id); |
300 | 0 | if pending_operations.is_empty() { Branch (300:20): [True: 0, False: 0]
Branch (300:20): [Folded - Ignored]
|
301 | 0 | self.pending_results.remove(worker_id); |
302 | 0 | } |
303 | 0 | return Ok(()); |
304 | 0 | } |
305 | 9 | } |
306 | | |
307 | 9 | Self::complete_worker_action( |
308 | 9 | worker, |
309 | 9 | operation_id, |
310 | 9 | due_to_backpressure, |
311 | 9 | &self.worker_change_notify, |
312 | 9 | ) |
313 | 9 | .await |
314 | 10 | } |
315 | | |
316 | 9 | async fn complete_worker_action( |
317 | 9 | worker: &mut Worker, |
318 | 9 | operation_id: &OperationId, |
319 | 9 | due_to_backpressure: bool, |
320 | 9 | notify: &Notify, |
321 | 9 | ) -> Result<(), Error> { |
322 | | // Clear this action from the current worker if finished. |
323 | 9 | let complete_action_res = { |
324 | 9 | let was_paused = !worker.can_accept_work(); |
325 | | |
326 | | // Note: We need to run this before dealing with backpressure logic. |
327 | 9 | let complete_action_res = worker.complete_action(operation_id).await; |
328 | | |
329 | | // Only pause if there's an action still waiting that will unpause. |
330 | 9 | if (was_paused || due_to_backpressure) && worker0 .has_actions0 () { Branch (330:17): [True: 0, False: 9]
Branch (330:31): [True: 0, False: 9]
Branch (330:55): [True: 0, False: 0]
Branch (330:17): [Folded - Ignored]
Branch (330:31): [Folded - Ignored]
Branch (330:55): [Folded - Ignored]
|
331 | 0 | worker.is_paused = true; |
332 | 9 | } |
333 | 9 | complete_action_res |
334 | | }; |
335 | | |
336 | 9 | notify.notify_one(); |
337 | | |
338 | 9 | complete_action_res |
339 | 9 | } |
340 | | |
341 | 0 | async fn notify_complete( |
342 | 0 | &mut self, |
343 | 0 | worker_id: &WorkerId, |
344 | 0 | operation_id: &OperationId, |
345 | 0 | ) -> Result<(), Error> { |
346 | 0 | let worker = self.workers.get_mut(worker_id).err_tip(|| { |
347 | 0 | format!("Worker {worker_id} does not exist in SimpleScheduler::notify_complete") |
348 | 0 | })?; |
349 | | |
350 | | // Ensure the worker is supposed to be running the operation. |
351 | 0 | if !worker.running_action_infos.contains_key(operation_id) { Branch (351:12): [True: 0, False: 0]
Branch (351:12): [Folded - Ignored]
|
352 | 0 | let err = make_err!( |
353 | 0 | Code::Internal, |
354 | | "Operation {operation_id} should not be running on worker {worker_id} in SimpleScheduler::update_action" |
355 | | ); |
356 | 0 | return Err(err); |
357 | 0 | } |
358 | | |
359 | 0 | match self.pending_results.entry(worker_id.clone()) { |
360 | 0 | std::collections::hash_map::Entry::Occupied(mut occupied_entry) => { |
361 | 0 | occupied_entry.get_mut().insert(operation_id.clone()); |
362 | 0 | } |
363 | 0 | std::collections::hash_map::Entry::Vacant(vacant_entry) => { |
364 | 0 | vacant_entry |
365 | 0 | .insert(HashSet::new()) |
366 | 0 | .insert(operation_id.clone()); |
367 | 0 | } |
368 | | } |
369 | | |
370 | 0 | Self::complete_worker_action(worker, operation_id, false, &self.worker_change_notify).await |
371 | 0 | } |
372 | | |
373 | | /// Notifies the specified worker to run the given action and handles errors by evicting |
374 | | /// the worker if the notification fails. |
375 | 32 | async fn worker_notify_run_action( |
376 | 32 | &mut self, |
377 | 32 | worker_id: WorkerId, |
378 | 32 | operation_id: OperationId, |
379 | 32 | action_info: ActionInfoWithProps, |
380 | 32 | ) -> Result<(), Error> { |
381 | 32 | if let Some(worker) = self.workers.get_mut(&worker_id) { Branch (381:16): [True: 0, False: 0]
Branch (381:16): [Folded - Ignored]
Branch (381:16): [True: 4, False: 0]
Branch (381:16): [True: 27, False: 0]
Branch (381:16): [True: 1, False: 0]
|
382 | 32 | let notify_worker_result = worker |
383 | 32 | .notify_update(WorkerUpdate::RunAction((operation_id, action_info.clone()))) |
384 | 32 | .await; |
385 | | |
386 | 32 | if let Err(notify_worker_result1 ) = notify_worker_result { Branch (386:20): [True: 0, False: 0]
Branch (386:20): [Folded - Ignored]
Branch (386:20): [True: 0, False: 4]
Branch (386:20): [True: 1, False: 26]
Branch (386:20): [True: 0, False: 1]
|
387 | 1 | warn!( |
388 | | ?worker_id, |
389 | | ?action_info, |
390 | | ?notify_worker_result, |
391 | 1 | "Worker command failed, removing worker", |
392 | | ); |
393 | | |
394 | | // A slightly nasty way of figuring out that the worker disconnected |
395 | | // from send_msg_to_worker without introducing complexity to the |
396 | | // code path from here to there. |
397 | 1 | let is_disconnect = notify_worker_result.code == Code::Internal Branch (397:37): [True: 0, False: 0]
Branch (397:37): [Folded - Ignored]
Branch (397:37): [True: 0, False: 0]
Branch (397:37): [True: 1, False: 0]
Branch (397:37): [True: 0, False: 0]
|
398 | 1 | && notify_worker_result.messages.len() == 1 Branch (398:24): [True: 0, False: 0]
Branch (398:24): [Folded - Ignored]
Branch (398:24): [True: 0, False: 0]
Branch (398:24): [True: 1, False: 0]
Branch (398:24): [True: 0, False: 0]
|
399 | 1 | && notify_worker_result.messages[0] == "Worker Disconnected"; |
400 | | |
401 | 1 | let err = make_err!( |
402 | 1 | Code::Internal, |
403 | | "Worker command failed, removing worker {worker_id} -- {notify_worker_result:?}", |
404 | | ); |
405 | | |
406 | 1 | return Result::<(), _>::Err(err.clone()).merge( |
407 | 1 | self.immediate_evict_worker(&worker_id, err, is_disconnect) |
408 | 1 | .await, |
409 | | ); |
410 | 31 | } |
411 | | } else { |
412 | 0 | warn!( |
413 | | ?worker_id, |
414 | | ?operation_id, |
415 | | ?action_info, |
416 | 0 | "Worker not found in worker map in worker_notify_run_action" |
417 | | ); |
418 | | } |
419 | 31 | Ok(()) |
420 | 32 | } |
421 | | |
422 | | /// Evicts the worker from the pool and puts items back into the queue if anything was being executed on it. |
423 | 9 | async fn immediate_evict_worker( |
424 | 9 | &mut self, |
425 | 9 | worker_id: &WorkerId, |
426 | 9 | err: Error, |
427 | 9 | is_disconnect: bool, |
428 | 9 | ) -> Result<(), Error> { |
429 | 9 | let mut result = Ok(()); |
430 | 9 | if let Some(mut worker) = self.remove_worker(worker_id) { Branch (430:16): [True: 8, False: 0]
Branch (430:16): [Folded - Ignored]
Branch (430:16): [True: 0, False: 0]
Branch (430:16): [True: 1, False: 0]
Branch (430:16): [True: 0, False: 0]
|
431 | | // We don't care if we fail to send message to worker, this is only a best attempt. |
432 | 9 | drop(worker.notify_update(WorkerUpdate::Disconnect).await); |
433 | 9 | let update = if is_disconnect { Branch (433:29): [True: 0, False: 8]
Branch (433:29): [Folded - Ignored]
Branch (433:29): [True: 0, False: 0]
Branch (433:29): [True: 0, False: 1]
Branch (433:29): [True: 0, False: 0]
|
434 | 0 | UpdateOperationType::UpdateWithDisconnect |
435 | | } else { |
436 | 9 | UpdateOperationType::UpdateWithError(err) |
437 | | }; |
438 | 9 | for (operation_id8 , _) in worker.running_action_infos.drain() { |
439 | 8 | result = result.merge( |
440 | 8 | self.worker_state_manager |
441 | 8 | .update_operation(&operation_id, worker_id, update.clone()) |
442 | 8 | .await, |
443 | | ); |
444 | | } |
445 | 0 | } |
446 | | // Note: Calling this many time is very cheap, it'll only trigger `do_try_match` once. |
447 | | // TODO(palfrey) This should be moved to inside the Workers struct. |
448 | 9 | self.worker_change_notify.notify_one(); |
449 | 9 | result |
450 | 9 | } |
451 | | } |
452 | | |
453 | | #[derive(Debug, MetricsComponent)] |
454 | | pub struct ApiWorkerScheduler { |
455 | | #[metric] |
456 | | inner: Mutex<ApiWorkerSchedulerImpl>, |
457 | | #[metric(group = "platform_property_manager")] |
458 | | platform_property_manager: Arc<PlatformPropertyManager>, |
459 | | |
460 | | #[metric( |
461 | | help = "Timeout of how long to evict workers if no response in this given amount of time in seconds." |
462 | | )] |
463 | | worker_timeout_s: u64, |
464 | | _operation_keep_alive_spawn: JoinHandleDropGuard<()>, |
465 | | } |
466 | | |
467 | | impl ApiWorkerScheduler { |
468 | 29 | pub fn new( |
469 | 29 | worker_state_manager: Arc<dyn WorkerStateManager>, |
470 | 29 | platform_property_manager: Arc<PlatformPropertyManager>, |
471 | 29 | allocation_strategy: WorkerAllocationStrategy, |
472 | 29 | worker_change_notify: Arc<Notify>, |
473 | 29 | worker_timeout_s: u64, |
474 | 29 | ) -> Arc<Self> { |
475 | 29 | let (operation_keep_alive_tx, mut operation_keep_alive_rx) = mpsc::unbounded_channel(); |
476 | 29 | Arc::new(Self { |
477 | 29 | inner: Mutex::new(ApiWorkerSchedulerImpl { |
478 | 29 | workers: Workers(LruCache::unbounded()), |
479 | 29 | pending_results: HashMap::new(), |
480 | 29 | worker_state_manager: worker_state_manager.clone(), |
481 | 29 | allocation_strategy, |
482 | 29 | worker_change_notify, |
483 | 29 | operation_keep_alive_tx, |
484 | 29 | }), |
485 | 29 | platform_property_manager, |
486 | 29 | worker_timeout_s, |
487 | 29 | _operation_keep_alive_spawn: spawn!( |
488 | | "simple_scheduler_operation_keep_alive", |
489 | 24 | async move { |
490 | | const RECV_MANY_LIMIT: usize = 256; |
491 | 24 | let mut messages = Vec::with_capacity(RECV_MANY_LIMIT); |
492 | | loop { |
493 | 24 | messages.clear(); |
494 | 24 | operation_keep_alive_rx |
495 | 24 | .recv_many(&mut messages, RECV_MANY_LIMIT) |
496 | 24 | .await; |
497 | 0 | if messages.is_empty() { Branch (497:28): [True: 0, False: 0]
Branch (497:28): [Folded - Ignored]
|
498 | 0 | return; // Looks like our sender has been dropped. |
499 | 0 | } |
500 | 0 | for (operation_id, worker_id) in messages.drain(..) { |
501 | 0 | let update_operation_res = worker_state_manager |
502 | 0 | .update_operation( |
503 | 0 | &operation_id, |
504 | 0 | &worker_id, |
505 | 0 | UpdateOperationType::KeepAlive, |
506 | 0 | ) |
507 | 0 | .await; |
508 | 0 | if let Err(err) = update_operation_res { Branch (508:36): [True: 0, False: 0]
Branch (508:36): [Folded - Ignored]
|
509 | 0 | warn!( |
510 | | ?err, |
511 | 0 | "Error while running worker_keep_alive_received, maybe job is done?" |
512 | | ); |
513 | 0 | } |
514 | | } |
515 | | } |
516 | 0 | } |
517 | | ), |
518 | | }) |
519 | 29 | } |
520 | | |
521 | 32 | pub async fn worker_notify_run_action( |
522 | 32 | &self, |
523 | 32 | worker_id: WorkerId, |
524 | 32 | operation_id: OperationId, |
525 | 32 | action_info: ActionInfoWithProps, |
526 | 32 | ) -> Result<(), Error> { |
527 | 32 | let mut inner = self.inner.lock().await; |
528 | 32 | inner |
529 | 32 | .worker_notify_run_action(worker_id, operation_id, action_info) |
530 | 32 | .await |
531 | 32 | } |
532 | | |
533 | | /// Attempts to find a worker that is capable of running this action. |
534 | | // TODO(palfrey) This algorithm is not very efficient. Simple testing using a tree-like |
535 | | // structure showed worse performance on a 10_000 worker * 7 properties * 1000 queued tasks |
536 | | // simulation of worst cases in a single threaded environment. |
537 | 50 | pub async fn find_worker_for_action( |
538 | 50 | &self, |
539 | 50 | platform_properties: &PlatformProperties, |
540 | 50 | ) -> Option<WorkerId> { |
541 | 50 | let inner = self.inner.lock().await; |
542 | 50 | inner.inner_find_worker_for_action(platform_properties) |
543 | 50 | } |
544 | | |
545 | | /// Checks to see if the worker exists in the worker pool. Should only be used in unit tests. |
546 | | #[must_use] |
547 | 7 | pub async fn contains_worker_for_test(&self, worker_id: &WorkerId) -> bool { |
548 | 7 | let inner = self.inner.lock().await; |
549 | 7 | inner.workers.contains(worker_id) |
550 | 7 | } |
551 | | |
552 | | /// A unit test function used to send the keep alive message to the worker from the server. |
553 | 0 | pub async fn send_keep_alive_to_worker_for_test( |
554 | 0 | &self, |
555 | 0 | worker_id: &WorkerId, |
556 | 1 | ) -> Result<(), Error> { |
557 | 1 | let mut inner = self.inner.lock().await; |
558 | 1 | let worker = inner.workers.get_mut(worker_id).ok_or_else(|| {0 |
559 | 0 | make_input_err!("WorkerId '{}' does not exist in workers map", worker_id) |
560 | 0 | })?; |
561 | 1 | worker.keep_alive() |
562 | 1 | } |
563 | | } |
564 | | |
565 | | #[async_trait] |
566 | | impl WorkerScheduler for ApiWorkerScheduler { |
567 | 1 | fn get_platform_property_manager(&self) -> &PlatformPropertyManager { |
568 | 1 | self.platform_property_manager.as_ref() |
569 | 1 | } |
570 | | |
571 | 70 | async fn add_worker(&self, worker: Worker) -> Result<(), Error> { |
572 | 35 | let mut inner = self.inner.lock().await; |
573 | 35 | let worker_id = worker.id.clone(); |
574 | 35 | let result = inner |
575 | 35 | .add_worker(worker) |
576 | 35 | .err_tip(|| "Error while adding worker, removing from pool"); |
577 | 35 | if let Err(err0 ) = result { Branch (577:16): [True: 0, False: 35]
Branch (577:16): [Folded - Ignored]
|
578 | 0 | return Result::<(), _>::Err(err.clone()) |
579 | 0 | .merge(inner.immediate_evict_worker(&worker_id, err, false).await); |
580 | 35 | } |
581 | 35 | Ok(()) |
582 | 70 | } |
583 | | |
584 | | async fn update_action( |
585 | | &self, |
586 | | worker_id: &WorkerId, |
587 | | operation_id: &OperationId, |
588 | | update: UpdateOperationType, |
589 | 20 | ) -> Result<(), Error> { |
590 | 10 | let mut inner = self.inner.lock().await; |
591 | 10 | inner.update_action(worker_id, operation_id, update).await |
592 | 20 | } |
593 | | |
594 | | async fn notify_complete( |
595 | | &self, |
596 | | worker_id: &WorkerId, |
597 | | operation_id: &OperationId, |
598 | 0 | ) -> Result<(), Error> { |
599 | 0 | let mut inner = self.inner.lock().await; |
600 | 0 | inner.notify_complete(worker_id, operation_id).await |
601 | 0 | } |
602 | | |
603 | | async fn worker_keep_alive_received( |
604 | | &self, |
605 | | worker_id: &WorkerId, |
606 | | timestamp: WorkerTimestamp, |
607 | 4 | ) -> Result<(), Error> { |
608 | 2 | let mut inner = self.inner.lock().await; |
609 | 2 | inner |
610 | 2 | .refresh_lifetime(worker_id, timestamp) |
611 | 2 | .err_tip(|| "Error refreshing lifetime in worker_keep_alive_received()") |
612 | 4 | } |
613 | | |
614 | 12 | async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> { |
615 | 6 | let mut inner = self.inner.lock().await; |
616 | 6 | inner |
617 | 6 | .immediate_evict_worker( |
618 | 6 | worker_id, |
619 | 6 | make_err!(Code::Internal, "Received request to remove worker"), |
620 | 6 | false, |
621 | 6 | ) |
622 | 6 | .await |
623 | 12 | } |
624 | | |
625 | 0 | async fn shutdown(&self, shutdown_guard: ShutdownGuard) { |
626 | 0 | let mut inner = self.inner.lock().await; |
627 | 0 | while let Some(worker_id) = inner Branch (627:19): [True: 0, False: 0]
Branch (627:19): [Folded - Ignored]
|
628 | 0 | .workers |
629 | 0 | .peek_lru() |
630 | 0 | .map(|(worker_id, _worker)| worker_id.clone()) |
631 | | { |
632 | 0 | if let Err(err) = inner Branch (632:20): [True: 0, False: 0]
Branch (632:20): [Folded - Ignored]
|
633 | 0 | .immediate_evict_worker( |
634 | 0 | &worker_id, |
635 | 0 | make_err!(Code::Internal, "Scheduler shutdown"), |
636 | 0 | true, |
637 | 0 | ) |
638 | 0 | .await |
639 | | { |
640 | 0 | error!(?err, "Error evicting worker on shutdown."); |
641 | 0 | } |
642 | | } |
643 | 0 | drop(shutdown_guard); |
644 | 0 | } |
645 | | |
646 | 10 | async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> { |
647 | 5 | let mut inner = self.inner.lock().await; |
648 | | |
649 | 5 | let mut result = Ok(()); |
650 | | // Items should be sorted based on last_update_timestamp, so we don't need to iterate the entire |
651 | | // map most of the time. |
652 | 5 | let worker_ids_to_remove: Vec<WorkerId> = inner |
653 | 5 | .workers |
654 | 5 | .iter() |
655 | 5 | .rev() |
656 | 6 | .map_while5 (|(worker_id, worker)| { |
657 | 6 | if worker.last_update_timestamp <= now_timestamp - self.worker_timeout_s { Branch (657:20): [True: 2, False: 4]
Branch (657:20): [Folded - Ignored]
|
658 | 2 | Some(worker_id.clone()) |
659 | | } else { |
660 | 4 | None |
661 | | } |
662 | 6 | }) |
663 | 5 | .collect(); |
664 | 7 | for worker_id2 in &worker_ids_to_remove { |
665 | 2 | warn!(?worker_id, "Worker timed out, removing from pool"); |
666 | 2 | result = result.merge( |
667 | 2 | inner |
668 | 2 | .immediate_evict_worker( |
669 | 2 | worker_id, |
670 | 2 | make_err!( |
671 | 2 | Code::Internal, |
672 | 2 | "Worker {worker_id} timed out, removing from pool" |
673 | 2 | ), |
674 | 2 | false, |
675 | 2 | ) |
676 | 2 | .await, |
677 | | ); |
678 | | } |
679 | | |
680 | 5 | result |
681 | 10 | } |
682 | | |
683 | 4 | async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> { |
684 | 2 | let mut inner = self.inner.lock().await; |
685 | 2 | inner.set_drain_worker(worker_id, is_draining).await |
686 | 4 | } |
687 | | } |
688 | | |
689 | | impl RootMetricsComponent for ApiWorkerScheduler {} |