/build/source/nativelink-scheduler/src/simple_scheduler_state_manager.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::ops::Bound; |
16 | | use std::string::ToString; |
17 | | use std::sync::{Arc, Weak}; |
18 | | use std::time::{Duration, SystemTime}; |
19 | | |
20 | | use async_lock::Mutex; |
21 | | use async_trait::async_trait; |
22 | | use futures::{future, stream, FutureExt, StreamExt, TryStreamExt}; |
23 | | use nativelink_error::{make_err, Code, Error, ResultExt}; |
24 | | use nativelink_metric::MetricsComponent; |
25 | | use nativelink_util::action_messages::{ |
26 | | ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueQualifier, ExecutionMetadata, |
27 | | OperationId, WorkerId, |
28 | | }; |
29 | | use nativelink_util::instant_wrapper::InstantWrapper; |
30 | | use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider; |
31 | | use nativelink_util::operation_state_manager::{ |
32 | | ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager, |
33 | | OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType, WorkerStateManager, |
34 | | }; |
35 | | use tracing::{event, Level}; |
36 | | |
37 | | use super::awaited_action_db::{ |
38 | | AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, SortedAwaitedActionState, |
39 | | }; |
40 | | |
41 | | /// Maximum number of times an update to the database |
42 | | /// can fail before giving up. |
43 | | const MAX_UPDATE_RETRIES: usize = 5; |
44 | | |
45 | | /// Simple struct that implements the ActionStateResult trait and always returns an error. |
46 | | struct ErrorActionStateResult(Error); |
47 | | |
48 | | #[async_trait] |
49 | | impl ActionStateResult for ErrorActionStateResult { |
50 | 0 | async fn as_state(&self) -> Result<Arc<ActionState>, Error> { |
51 | 0 | Err(self.0.clone()) |
52 | 0 | } |
53 | | |
54 | 0 | async fn changed(&mut self) -> Result<Arc<ActionState>, Error> { |
55 | 0 | Err(self.0.clone()) |
56 | 0 | } |
57 | | |
58 | 0 | async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> { |
59 | 0 | Err(self.0.clone()) |
60 | 0 | } |
61 | | } |
62 | | |
63 | 45 | fn apply_filter_predicate(awaited_action: &AwaitedAction, filter: &OperationFilter) -> bool { |
64 | | // Note: The caller must filter `client_operation_id`. |
65 | | |
66 | 45 | if let Some(operation_id0 ) = &filter.operation_id { Branch (66:12): [True: 0, False: 45]
Branch (66:12): [Folded - Ignored]
|
67 | 0 | if operation_id != awaited_action.operation_id() { Branch (67:12): [True: 0, False: 0]
Branch (67:12): [Folded - Ignored]
|
68 | 0 | return false; |
69 | 0 | } |
70 | 45 | } |
71 | | |
72 | 45 | if filter.worker_id.is_some() && filter.worker_id != awaited_action.worker_id()0 { Branch (72:8): [True: 0, False: 45]
Branch (72:38): [True: 0, False: 0]
Branch (72:8): [Folded - Ignored]
Branch (72:38): [Folded - Ignored]
|
73 | 0 | return false; |
74 | 45 | } |
75 | | |
76 | | { |
77 | 45 | if let Some(filter_unique_key0 ) = &filter.unique_key { Branch (77:16): [True: 0, False: 45]
Branch (77:16): [Folded - Ignored]
|
78 | 0 | match &awaited_action.action_info().unique_qualifier { |
79 | 0 | ActionUniqueQualifier::Cachable(unique_key) => { |
80 | 0 | if filter_unique_key != unique_key { Branch (80:24): [True: 0, False: 0]
Branch (80:24): [Folded - Ignored]
|
81 | 0 | return false; |
82 | 0 | } |
83 | | } |
84 | | ActionUniqueQualifier::Uncachable(_) => { |
85 | 0 | return false; |
86 | | } |
87 | | } |
88 | 45 | } |
89 | 45 | if let Some(action_digest0 ) = filter.action_digest { Branch (89:16): [True: 0, False: 45]
Branch (89:16): [Folded - Ignored]
|
90 | 0 | if action_digest != awaited_action.action_info().digest() { Branch (90:16): [True: 0, False: 0]
Branch (90:16): [Folded - Ignored]
|
91 | 0 | return false; |
92 | 0 | } |
93 | 45 | } |
94 | | } |
95 | | |
96 | | { |
97 | 45 | let last_worker_update_timestamp = awaited_action.last_worker_updated_timestamp(); |
98 | 45 | if let Some(worker_update_before0 ) = filter.worker_update_before { Branch (98:16): [True: 0, False: 45]
Branch (98:16): [Folded - Ignored]
|
99 | 0 | if worker_update_before < last_worker_update_timestamp { Branch (99:16): [True: 0, False: 0]
Branch (99:16): [Folded - Ignored]
|
100 | 0 | return false; |
101 | 0 | } |
102 | 45 | } |
103 | 45 | if let Some(completed_before0 ) = filter.completed_before { Branch (103:16): [True: 0, False: 45]
Branch (103:16): [Folded - Ignored]
|
104 | 0 | if awaited_action.state().stage.is_finished() Branch (104:16): [True: 0, False: 0]
Branch (104:16): [Folded - Ignored]
|
105 | 0 | && completed_before < last_worker_update_timestamp Branch (105:20): [True: 0, False: 0]
Branch (105:20): [Folded - Ignored]
|
106 | | { |
107 | 0 | return false; |
108 | 0 | } |
109 | 45 | } |
110 | 45 | if filter.stages != OperationStageFlags::Any { Branch (110:12): [True: 42, False: 3]
Branch (110:12): [Folded - Ignored]
|
111 | 42 | let stage_flag = match awaited_action.state().stage { |
112 | 0 | ActionStage::Unknown => OperationStageFlags::Any, |
113 | 0 | ActionStage::CacheCheck => OperationStageFlags::CacheCheck, |
114 | 42 | ActionStage::Queued => OperationStageFlags::Queued, |
115 | 0 | ActionStage::Executing => OperationStageFlags::Executing, |
116 | 0 | ActionStage::Completed(_) => OperationStageFlags::Completed, |
117 | 0 | ActionStage::CompletedFromCache(_) => OperationStageFlags::Completed, |
118 | | }; |
119 | 42 | if !filter.stages.intersects(stage_flag) { Branch (119:16): [True: 0, False: 42]
Branch (119:16): [Folded - Ignored]
|
120 | 0 | return false; |
121 | 42 | } |
122 | 3 | } |
123 | | } |
124 | | |
125 | 45 | true |
126 | 45 | } |
127 | | |
128 | | struct ClientActionStateResult<U, T, I, NowFn> |
129 | | where |
130 | | U: AwaitedActionSubscriber, |
131 | | T: AwaitedActionDb, |
132 | | I: InstantWrapper, |
133 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
134 | | { |
135 | | inner: MatchingEngineActionStateResult<U, T, I, NowFn>, |
136 | | } |
137 | | |
138 | | impl<U, T, I, NowFn> ClientActionStateResult<U, T, I, NowFn> |
139 | | where |
140 | | U: AwaitedActionSubscriber, |
141 | | T: AwaitedActionDb, |
142 | | I: InstantWrapper, |
143 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
144 | | { |
145 | 28 | fn new( |
146 | 28 | sub: U, |
147 | 28 | simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>, |
148 | 28 | no_event_action_timeout: Duration, |
149 | 28 | now_fn: NowFn, |
150 | 28 | ) -> Self { |
151 | 28 | Self { |
152 | 28 | inner: MatchingEngineActionStateResult::new( |
153 | 28 | sub, |
154 | 28 | simple_scheduler_state_manager, |
155 | 28 | no_event_action_timeout, |
156 | 28 | now_fn, |
157 | 28 | ), |
158 | 28 | } |
159 | 28 | } |
160 | | } |
161 | | |
162 | | #[async_trait] |
163 | | impl<U, T, I, NowFn> ActionStateResult for ClientActionStateResult<U, T, I, NowFn> |
164 | | where |
165 | | U: AwaitedActionSubscriber, |
166 | | T: AwaitedActionDb, |
167 | | I: InstantWrapper, |
168 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
169 | | { |
170 | 3 | async fn as_state(&self) -> Result<Arc<ActionState>, Error> { |
171 | 3 | self.inner.as_state().await0 |
172 | 6 | } |
173 | | |
174 | 44 | async fn changed(&mut self) -> Result<Arc<ActionState>, Error> { |
175 | 1.49k | self.inner.changed()44 .await |
176 | 86 | } |
177 | | |
178 | 0 | async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> { |
179 | 0 | self.inner.as_action_info().await |
180 | 0 | } |
181 | | } |
182 | | |
183 | | struct MatchingEngineActionStateResult<U, T, I, NowFn> |
184 | | where |
185 | | U: AwaitedActionSubscriber, |
186 | | T: AwaitedActionDb, |
187 | | I: InstantWrapper, |
188 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
189 | | { |
190 | | awaited_action_sub: U, |
191 | | simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>, |
192 | | no_event_action_timeout: Duration, |
193 | | now_fn: NowFn, |
194 | | } |
195 | | impl<U, T, I, NowFn> MatchingEngineActionStateResult<U, T, I, NowFn> |
196 | | where |
197 | | U: AwaitedActionSubscriber, |
198 | | T: AwaitedActionDb, |
199 | | I: InstantWrapper, |
200 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
201 | | { |
202 | 70 | fn new( |
203 | 70 | awaited_action_sub: U, |
204 | 70 | simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>, |
205 | 70 | no_event_action_timeout: Duration, |
206 | 70 | now_fn: NowFn, |
207 | 70 | ) -> Self { |
208 | 70 | Self { |
209 | 70 | awaited_action_sub, |
210 | 70 | simple_scheduler_state_manager, |
211 | 70 | no_event_action_timeout, |
212 | 70 | now_fn, |
213 | 70 | } |
214 | 70 | } |
215 | | } |
216 | | |
217 | | #[async_trait] |
218 | | impl<U, T, I, NowFn> ActionStateResult for MatchingEngineActionStateResult<U, T, I, NowFn> |
219 | | where |
220 | | U: AwaitedActionSubscriber, |
221 | | T: AwaitedActionDb, |
222 | | I: InstantWrapper, |
223 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
224 | | { |
225 | 31 | async fn as_state(&self) -> Result<Arc<ActionState>, Error> { |
226 | 31 | Ok(self |
227 | 31 | .awaited_action_sub |
228 | 31 | .borrow() |
229 | 0 | .await |
230 | 31 | .err_tip(|| "In MatchingEngineActionStateResult::as_state"0 )?0 |
231 | 31 | .state() |
232 | 31 | .clone()) |
233 | 62 | } |
234 | | |
235 | 44 | async fn changed(&mut self) -> Result<Arc<ActionState>, Error> { |
236 | 44 | let mut timeout_attempts = 0; |
237 | | loop { |
238 | 54 | tokio::select! { |
239 | 54 | awaited_action_result42 = self.awaited_action_sub.changed() => { |
240 | 42 | return awaited_action_result |
241 | 42 | .err_tip(|| "In MatchingEngineActionStateResult::changed"0 ) |
242 | 42 | .map(|v| v.state().clone()); |
243 | | } |
244 | 54 | _ = (self.now_fn)().sleep(self.no_event_action_timeout) => { |
245 | 10 | // Timeout happened, do additional checks below. |
246 | 10 | } |
247 | | } |
248 | | |
249 | 10 | let awaited_action = self |
250 | 10 | .awaited_action_sub |
251 | 10 | .borrow() |
252 | 0 | .await |
253 | 10 | .err_tip(|| "In MatchingEngineActionStateResult::changed"0 )?0 ; |
254 | | |
255 | 10 | if matches!1 (awaited_action.state().stage, ActionStage::Queued) { |
256 | | // Actions in queued state do not get periodically updated, |
257 | | // so we don't need to timeout them. |
258 | 9 | continue; |
259 | 1 | } |
260 | | |
261 | 1 | let simple_scheduler_state_manager = self |
262 | 1 | .simple_scheduler_state_manager |
263 | 1 | .upgrade() |
264 | 1 | .err_tip(|| format!("Failed to upgrade weak reference to SimpleSchedulerStateManager in MatchingEngineActionStateResult::changed at attempt: {timeout_attempts}")0 )?0 ; |
265 | | |
266 | 1 | event!( |
267 | 1 | Level::WARN, |
268 | | ?awaited_action, |
269 | 1 | "OperationId {} / {} timed out after {} seconds issuing a retry", |
270 | 1 | awaited_action.operation_id(), |
271 | 1 | awaited_action.state().client_operation_id, |
272 | 1 | self.no_event_action_timeout.as_secs_f32(), |
273 | | ); |
274 | | |
275 | 1 | simple_scheduler_state_manager |
276 | 1 | .timeout_operation_id(awaited_action.operation_id()) |
277 | 0 | .await |
278 | 1 | .err_tip(|| "In MatchingEngineActionStateResult::changed"0 )?0 ; |
279 | | |
280 | 1 | if timeout_attempts >= MAX_UPDATE_RETRIES { Branch (280:16): [True: 0, False: 0]
Branch (280:16): [True: 0, False: 0]
Branch (280:16): [Folded - Ignored]
Branch (280:16): [True: 0, False: 1]
Branch (280:16): [True: 0, False: 0]
|
281 | 0 | return Err(make_err!( |
282 | 0 | Code::Internal, |
283 | 0 | "Failed to update action after {} retries with no error set in MatchingEngineActionStateResult::changed - {} {:?}", |
284 | 0 | MAX_UPDATE_RETRIES, |
285 | 0 | awaited_action.operation_id(), |
286 | 0 | awaited_action.state().stage, |
287 | 0 | )); |
288 | 1 | } |
289 | 1 | timeout_attempts += 1; |
290 | | } |
291 | 86 | } |
292 | | |
293 | 42 | async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> { |
294 | 42 | Ok(self |
295 | 42 | .awaited_action_sub |
296 | 42 | .borrow() |
297 | 0 | .await |
298 | 42 | .err_tip(|| "In MatchingEngineActionStateResult::as_action_info"0 )?0 |
299 | 42 | .action_info() |
300 | 42 | .clone()) |
301 | 84 | } |
302 | | } |
303 | | |
304 | | /// SimpleSchedulerStateManager is responsible for maintaining the state of the scheduler. |
305 | | /// Scheduler state includes the actions that are queued, active, and recently completed. |
306 | | /// It also includes the workers that are available to execute actions based on allocation |
307 | | /// strategy. |
308 | 0 | #[derive(MetricsComponent)] |
309 | | pub struct SimpleSchedulerStateManager<T, I, NowFn> |
310 | | where |
311 | | T: AwaitedActionDb, |
312 | | I: InstantWrapper, |
313 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
314 | | { |
315 | | /// Database for storing the state of all actions. |
316 | | #[metric(group = "action_db")] |
317 | | action_db: T, |
318 | | |
319 | | /// Maximum number of times a job can be retried. |
320 | | // TODO(allada) This should be a scheduler decorator instead |
321 | | // of always having it on every SimpleScheduler. |
322 | | #[metric(help = "Maximum number of times a job can be retried")] |
323 | | max_job_retries: usize, |
324 | | |
325 | | /// Duration after which an action is considered to be timed out if |
326 | | /// no event is received. |
327 | | #[metric( |
328 | | help = "Duration after which an action is considered to be timed out if no event is received" |
329 | | )] |
330 | | no_event_action_timeout: Duration, |
331 | | |
332 | | // A lock to ensure only one timeout operation is running at a time |
333 | | // on this service. |
334 | | timeout_operation_mux: Mutex<()>, |
335 | | |
336 | | /// Weak reference to self. |
337 | | // We use a weak reference to reduce the risk of a memory leak from |
338 | | // future changes. If this becomes some kind of perforamnce issue, |
339 | | // we can consider using a strong reference. |
340 | | weak_self: Weak<Self>, |
341 | | |
342 | | /// Function to get the current time. |
343 | | now_fn: NowFn, |
344 | | } |
345 | | |
346 | | impl<T, I, NowFn> SimpleSchedulerStateManager<T, I, NowFn> |
347 | | where |
348 | | T: AwaitedActionDb, |
349 | | I: InstantWrapper, |
350 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
351 | | { |
352 | 21 | pub fn new( |
353 | 21 | max_job_retries: usize, |
354 | 21 | no_event_action_timeout: Duration, |
355 | 21 | action_db: T, |
356 | 21 | now_fn: NowFn, |
357 | 21 | ) -> Arc<Self> { |
358 | 21 | Arc::new_cyclic(|weak_self| Self { |
359 | 21 | action_db, |
360 | 21 | max_job_retries, |
361 | 21 | no_event_action_timeout, |
362 | 21 | timeout_operation_mux: Mutex::new(()), |
363 | 21 | weak_self: weak_self.clone(), |
364 | 21 | now_fn, |
365 | 21 | }) |
366 | 21 | } |
367 | | |
368 | | /// Let the scheduler know that an operation has timed out from |
369 | | /// the client side (ie: worker has not updated in a while). |
370 | 1 | async fn timeout_operation_id(&self, operation_id: &OperationId) -> Result<(), Error> { |
371 | | // Ensure that only one timeout operation is running at a time. |
372 | | // Failing to do this could result in the same operation being |
373 | | // timed out multiple times at the same time. |
374 | | // Note: We could implement this on a per-operation_id basis, but it is quite |
375 | | // complex to manage the locks. |
376 | 1 | let _lock = self.timeout_operation_mux.lock().await0 ; |
377 | | |
378 | 1 | let awaited_action_subscriber = self |
379 | 1 | .action_db |
380 | 1 | .get_by_operation_id(operation_id) |
381 | 0 | .await |
382 | 1 | .err_tip(|| "In SimpleSchedulerStateManager::timeout_operation_id"0 )?0 |
383 | 1 | .err_tip(|| { |
384 | 0 | format!("Operation id {operation_id} does not exist in SimpleSchedulerStateManager::timeout_operation_id") |
385 | 1 | })?0 ; |
386 | | |
387 | 1 | let awaited_action = awaited_action_subscriber |
388 | 1 | .borrow() |
389 | 0 | .await |
390 | 1 | .err_tip(|| "In SimpleSchedulerStateManager::timeout_operation_id"0 )?0 ; |
391 | | |
392 | | // If the action is not executing, we should not timeout the action. |
393 | 1 | if !matches!0 (awaited_action.state().stage, ActionStage::Executing) { Branch (393:12): [True: 0, False: 0]
Branch (393:12): [True: 0, False: 0]
Branch (393:12): [Folded - Ignored]
Branch (393:12): [True: 0, False: 1]
Branch (393:12): [True: 0, False: 0]
|
394 | 0 | return Ok(()); |
395 | 1 | } |
396 | | |
397 | 1 | let last_worker_updated = awaited_action |
398 | 1 | .last_worker_updated_timestamp() |
399 | 1 | .duration_since(SystemTime::UNIX_EPOCH) |
400 | 1 | .map_err(|e| { |
401 | 0 | make_err!( |
402 | 0 | Code::Internal, |
403 | 0 | "Failed to convert last_worker_updated to duration since epoch {e:?}" |
404 | 0 | ) |
405 | 1 | })?0 ; |
406 | 1 | let worker_should_update_before = last_worker_updated |
407 | 1 | .checked_add(self.no_event_action_timeout) |
408 | 1 | .err_tip(|| "Timestamp too big in SimpleSchedulerStateManager::timeout_operation_id"0 )?0 ; |
409 | 1 | if worker_should_update_before < (self.now_fn)().elapsed() { Branch (409:12): [True: 0, False: 0]
Branch (409:12): [True: 0, False: 0]
Branch (409:12): [Folded - Ignored]
Branch (409:12): [True: 0, False: 1]
Branch (409:12): [True: 0, False: 0]
|
410 | | // The action was updated recently, we should not timeout the action. |
411 | | // This is to prevent timing out actions that have recently been updated |
412 | | // (like multiple clients timeout the same action at the same time). |
413 | 0 | return Ok(()); |
414 | 1 | } |
415 | 1 | |
416 | 1 | self.assign_operation( |
417 | 1 | operation_id, |
418 | 1 | Err(make_err!( |
419 | 1 | Code::DeadlineExceeded, |
420 | 1 | "Operation timed out after {} seconds", |
421 | 1 | self.no_event_action_timeout.as_secs_f32(), |
422 | 1 | )), |
423 | 1 | ) |
424 | 0 | .await |
425 | 1 | } |
426 | | |
427 | 41 | async fn inner_update_operation( |
428 | 41 | &self, |
429 | 41 | operation_id: &OperationId, |
430 | 41 | maybe_worker_id: Option<&WorkerId>, |
431 | 41 | update: UpdateOperationType, |
432 | 41 | ) -> Result<(), Error> { |
433 | 41 | let mut last_err = None; |
434 | 42 | for _ in 0..MAX_UPDATE_RETRIES { |
435 | 42 | let maybe_awaited_action_subscriber = self |
436 | 42 | .action_db |
437 | 42 | .get_by_operation_id(operation_id) |
438 | 0 | .await |
439 | 42 | .err_tip(|| "In SimpleSchedulerStateManager::update_operation"0 )?0 ; |
440 | 42 | let Some(awaited_action_subscriber41 ) = maybe_awaited_action_subscriber else { Branch (440:17): [True: 0, False: 0]
Branch (440:17): [True: 0, False: 0]
Branch (440:17): [Folded - Ignored]
Branch (440:17): [True: 39, False: 0]
Branch (440:17): [True: 2, False: 1]
|
441 | | // No action found. It is ok if the action was not found. It |
442 | | // probably means that the action was dropped, but worker was |
443 | | // still processing it. |
444 | 1 | return Ok(()); |
445 | | }; |
446 | | |
447 | 41 | let mut awaited_action = awaited_action_subscriber |
448 | 41 | .borrow() |
449 | 0 | .await |
450 | 41 | .err_tip(|| "In SimpleSchedulerStateManager::update_operation"0 )?0 ; |
451 | | |
452 | | // Make sure the worker id matches the awaited action worker id. |
453 | | // This might happen if the worker sending the update is not the |
454 | | // worker that was assigned. |
455 | 41 | if awaited_action.worker_id().is_some() Branch (455:16): [True: 0, False: 0]
Branch (455:16): [True: 0, False: 0]
Branch (455:16): [Folded - Ignored]
Branch (455:16): [True: 13, False: 26]
Branch (455:16): [True: 0, False: 2]
|
456 | 13 | && maybe_worker_id.is_some() Branch (456:20): [True: 0, False: 0]
Branch (456:20): [True: 0, False: 0]
Branch (456:20): [Folded - Ignored]
Branch (456:20): [True: 12, False: 1]
Branch (456:20): [True: 0, False: 0]
|
457 | 12 | && maybe_worker_id != awaited_action.worker_id().as_ref() Branch (457:20): [True: 0, False: 0]
Branch (457:20): [True: 0, False: 0]
Branch (457:20): [Folded - Ignored]
Branch (457:20): [True: 0, False: 12]
Branch (457:20): [True: 0, False: 0]
|
458 | | { |
459 | | // If another worker is already assigned to the action, another |
460 | | // worker probably picked up the action. We should not update the |
461 | | // action in this case and abort this operation. |
462 | 0 | let err = make_err!( |
463 | 0 | Code::Aborted, |
464 | 0 | "Worker ids do not match - {:?} != {:?} for {:?}", |
465 | 0 | maybe_worker_id, |
466 | 0 | awaited_action.worker_id(), |
467 | 0 | awaited_action, |
468 | 0 | ); |
469 | 0 | event!( |
470 | 0 | Level::INFO, |
471 | 0 | "Worker ids do not match - {:?} != {:?} for {:?}. This is probably due to another worker picking up the action.", |
472 | 0 | maybe_worker_id, |
473 | 0 | awaited_action.worker_id(), |
474 | | awaited_action, |
475 | | ); |
476 | 0 | return Err(err); |
477 | 41 | } |
478 | 41 | |
479 | 41 | // Make sure we don't update an action that is already completed. |
480 | 41 | if awaited_action.state().stage.is_finished() { Branch (480:16): [True: 0, False: 0]
Branch (480:16): [True: 0, False: 0]
Branch (480:16): [Folded - Ignored]
Branch (480:16): [True: 0, False: 39]
Branch (480:16): [True: 0, False: 2]
|
481 | 0 | return Err(make_err!( |
482 | 0 | Code::Internal, |
483 | 0 | "Action {operation_id:?} is already completed with state {:?} - maybe_worker_id: {:?}", |
484 | 0 | awaited_action.state().stage, |
485 | 0 | maybe_worker_id, |
486 | 0 | )); |
487 | 41 | } |
488 | | |
489 | 41 | let stage = match &update { |
490 | | UpdateOperationType::KeepAlive => { |
491 | 0 | awaited_action.keep_alive((self.now_fn)().now()); |
492 | 0 | return self |
493 | 0 | .action_db |
494 | 0 | .update_awaited_action(awaited_action) |
495 | 0 | .await |
496 | 0 | .err_tip(|| "Failed to send KeepAlive in SimpleSchedulerStateManager::update_operation"); |
497 | | } |
498 | 33 | UpdateOperationType::UpdateWithActionStage(stage) => stage.clone(), |
499 | 8 | UpdateOperationType::UpdateWithError(err) => { |
500 | 8 | // Don't count a backpressure failure as an attempt for an action. |
501 | 8 | let due_to_backpressure = err.code == Code::ResourceExhausted; |
502 | 8 | if !due_to_backpressure { Branch (502:24): [True: 0, False: 0]
Branch (502:24): [True: 0, False: 0]
Branch (502:24): [Folded - Ignored]
Branch (502:24): [True: 8, False: 0]
Branch (502:24): [True: 0, False: 0]
|
503 | 8 | awaited_action.attempts += 1; |
504 | 8 | }0 |
505 | | |
506 | 8 | if awaited_action.attempts > self.max_job_retries { Branch (506:24): [True: 0, False: 0]
Branch (506:24): [True: 0, False: 0]
Branch (506:24): [Folded - Ignored]
Branch (506:24): [True: 1, False: 7]
Branch (506:24): [True: 0, False: 0]
|
507 | 1 | ActionStage::Completed(ActionResult { |
508 | 1 | execution_metadata: ExecutionMetadata { |
509 | 1 | worker: maybe_worker_id.map_or_else(String::default, ToString::to_string), |
510 | 1 | ..ExecutionMetadata::default() |
511 | 1 | }, |
512 | 1 | error: Some(err.clone().merge(make_err!( |
513 | 1 | Code::Internal, |
514 | 1 | "Job cancelled because it attempted to execute too many times {} > {} times {}", |
515 | 1 | awaited_action.attempts, |
516 | 1 | self.max_job_retries, |
517 | 1 | format!("for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}"), |
518 | 1 | ))), |
519 | 1 | ..ActionResult::default() |
520 | 1 | }) |
521 | | } else { |
522 | 7 | ActionStage::Queued |
523 | | } |
524 | | } |
525 | | }; |
526 | 41 | let now = (self.now_fn)().now(); |
527 | 41 | if matches!34 (stage, ActionStage::Queued) { |
528 | 7 | // If the action is queued, we need to unset the worker id regardless of |
529 | 7 | // which worker sent the update. |
530 | 7 | awaited_action.set_worker_id(None, now); |
531 | 34 | } else { |
532 | 34 | awaited_action.set_worker_id(maybe_worker_id.copied(), now); |
533 | 34 | } |
534 | 41 | awaited_action.set_state( |
535 | 41 | Arc::new(ActionState { |
536 | 41 | stage, |
537 | 41 | // Client id is not known here, it is the responsibility of |
538 | 41 | // the the subscriber impl to replace this with the |
539 | 41 | // correct client id. |
540 | 41 | client_operation_id: operation_id.clone(), |
541 | 41 | action_digest: awaited_action.action_info().digest(), |
542 | 41 | }), |
543 | 41 | Some(now), |
544 | 41 | ); |
545 | | |
546 | 41 | let update_action_result = self |
547 | 41 | .action_db |
548 | 41 | .update_awaited_action(awaited_action) |
549 | 0 | .await |
550 | 41 | .err_tip(|| "In SimpleSchedulerStateManager::update_operation"2 ); |
551 | 41 | if let Err(err2 ) = update_action_result { Branch (551:20): [True: 0, False: 0]
Branch (551:20): [True: 0, False: 0]
Branch (551:20): [Folded - Ignored]
Branch (551:20): [True: 0, False: 39]
Branch (551:20): [True: 2, False: 0]
|
552 | | // We use Aborted to signal that the action was not |
553 | | // updated due to the data being set was not the latest |
554 | | // but can be retried. |
555 | 2 | if err.code == Code::Aborted { Branch (555:20): [True: 0, False: 0]
Branch (555:20): [True: 0, False: 0]
Branch (555:20): [Folded - Ignored]
Branch (555:20): [True: 0, False: 0]
Branch (555:20): [True: 1, False: 1]
|
556 | 1 | last_err = Some(err); |
557 | 1 | continue; |
558 | | } else { |
559 | 1 | return Err(err); |
560 | | } |
561 | 39 | } |
562 | 39 | return Ok(()); |
563 | | } |
564 | 0 | match last_err { |
565 | 0 | Some(err) => Err(err), |
566 | 0 | None => Err(make_err!( |
567 | 0 | Code::Internal, |
568 | 0 | "Failed to update action after {} retries with no error set", |
569 | 0 | MAX_UPDATE_RETRIES, |
570 | 0 | )), |
571 | | } |
572 | 41 | } |
573 | | |
574 | 25 | async fn inner_add_operation( |
575 | 25 | &self, |
576 | 25 | new_client_operation_id: OperationId, |
577 | 25 | action_info: Arc<ActionInfo>, |
578 | 25 | ) -> Result<T::Subscriber, Error> { |
579 | 25 | self.action_db |
580 | 25 | .add_action(new_client_operation_id, action_info) |
581 | 0 | .await |
582 | 25 | .err_tip(|| "In SimpleSchedulerStateManager::add_operation"0 ) |
583 | 25 | } |
584 | | |
585 | 594 | async fn inner_filter_operations<'a, F>( |
586 | 594 | &'a self, |
587 | 594 | filter: OperationFilter, |
588 | 594 | to_action_state_result: F, |
589 | 594 | ) -> Result<ActionStateResultStream<'a>, Error> |
590 | 594 | where |
591 | 594 | F: Fn(T::Subscriber) -> Box<dyn ActionStateResult> + Send + Sync + 'a, |
592 | 594 | { |
593 | 91 | fn sorted_awaited_action_state_for_flags( |
594 | 91 | stage: OperationStageFlags, |
595 | 91 | ) -> Option<SortedAwaitedActionState> { |
596 | 91 | match stage { |
597 | 0 | OperationStageFlags::CacheCheck => Some(SortedAwaitedActionState::CacheCheck), |
598 | 91 | OperationStageFlags::Queued => Some(SortedAwaitedActionState::Queued), |
599 | 0 | OperationStageFlags::Executing => Some(SortedAwaitedActionState::Executing), |
600 | 0 | OperationStageFlags::Completed => Some(SortedAwaitedActionState::Completed), |
601 | 0 | _ => None, |
602 | | } |
603 | 91 | } |
604 | | |
605 | 594 | if let Some(operation_id0 ) = &filter.operation_id { Branch (605:16): [True: 0, False: 0]
Branch (605:16): [True: 0, False: 0]
Branch (605:16): [True: 0, False: 0]
Branch (605:16): [True: 0, False: 0]
Branch (605:16): [Folded - Ignored]
Branch (605:16): [True: 0, False: 503]
Branch (605:16): [True: 0, False: 87]
Branch (605:16): [True: 0, False: 0]
Branch (605:16): [True: 0, False: 4]
|
606 | 0 | let maybe_subscriber = self |
607 | 0 | .action_db |
608 | 0 | .get_by_operation_id(operation_id) |
609 | 0 | .await |
610 | 0 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?; |
611 | 0 | let Some(subscriber) = maybe_subscriber else { Branch (611:17): [True: 0, False: 0]
Branch (611:17): [True: 0, False: 0]
Branch (611:17): [True: 0, False: 0]
Branch (611:17): [True: 0, False: 0]
Branch (611:17): [Folded - Ignored]
Branch (611:17): [True: 0, False: 0]
Branch (611:17): [True: 0, False: 0]
Branch (611:17): [True: 0, False: 0]
Branch (611:17): [True: 0, False: 0]
|
612 | 0 | return Ok(Box::pin(stream::empty())); |
613 | | }; |
614 | 0 | let awaited_action = subscriber |
615 | 0 | .borrow() |
616 | 0 | .await |
617 | 0 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?; |
618 | 0 | if !apply_filter_predicate(&awaited_action, &filter) { Branch (618:16): [True: 0, False: 0]
Branch (618:16): [True: 0, False: 0]
Branch (618:16): [True: 0, False: 0]
Branch (618:16): [True: 0, False: 0]
Branch (618:16): [Folded - Ignored]
Branch (618:16): [True: 0, False: 0]
Branch (618:16): [True: 0, False: 0]
Branch (618:16): [True: 0, False: 0]
Branch (618:16): [True: 0, False: 0]
|
619 | 0 | return Ok(Box::pin(stream::empty())); |
620 | 0 | } |
621 | 0 | return Ok(Box::pin(stream::once(async move { |
622 | 0 | to_action_state_result(subscriber) |
623 | 0 | }))); |
624 | 594 | } |
625 | 594 | if let Some(client_operation_id503 ) = &filter.client_operation_id { Branch (625:16): [True: 0, False: 0]
Branch (625:16): [True: 0, False: 0]
Branch (625:16): [True: 0, False: 0]
Branch (625:16): [True: 0, False: 0]
Branch (625:16): [Folded - Ignored]
Branch (625:16): [True: 503, False: 0]
Branch (625:16): [True: 0, False: 87]
Branch (625:16): [True: 0, False: 0]
Branch (625:16): [True: 0, False: 4]
|
626 | 503 | let maybe_subscriber = self |
627 | 503 | .action_db |
628 | 503 | .get_awaited_action_by_id(client_operation_id) |
629 | 0 | .await |
630 | 503 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations"0 )?0 ; |
631 | 503 | let Some(subscriber3 ) = maybe_subscriber else { Branch (631:17): [True: 0, False: 0]
Branch (631:17): [True: 0, False: 0]
Branch (631:17): [True: 0, False: 0]
Branch (631:17): [True: 0, False: 0]
Branch (631:17): [Folded - Ignored]
Branch (631:17): [True: 3, False: 500]
Branch (631:17): [True: 0, False: 0]
Branch (631:17): [True: 0, False: 0]
Branch (631:17): [True: 0, False: 0]
|
632 | 500 | return Ok(Box::pin(stream::empty())); |
633 | | }; |
634 | 3 | let awaited_action = subscriber |
635 | 3 | .borrow() |
636 | 0 | .await |
637 | 3 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations"0 )?0 ; |
638 | 3 | if !apply_filter_predicate(&awaited_action, &filter) { Branch (638:16): [True: 0, False: 0]
Branch (638:16): [True: 0, False: 0]
Branch (638:16): [True: 0, False: 0]
Branch (638:16): [True: 0, False: 0]
Branch (638:16): [Folded - Ignored]
Branch (638:16): [True: 0, False: 3]
Branch (638:16): [True: 0, False: 0]
Branch (638:16): [True: 0, False: 0]
Branch (638:16): [True: 0, False: 0]
|
639 | 0 | return Ok(Box::pin(stream::empty())); |
640 | 3 | } |
641 | 3 | return Ok(Box::pin(stream::once(async move { |
642 | 3 | to_action_state_result(subscriber) |
643 | 3 | }))); |
644 | 91 | } |
645 | | |
646 | 91 | let Some(sorted_awaited_action_state) = Branch (646:13): [True: 0, False: 0]
Branch (646:13): [True: 0, False: 0]
Branch (646:13): [True: 0, False: 0]
Branch (646:13): [True: 0, False: 0]
Branch (646:13): [Folded - Ignored]
Branch (646:13): [True: 0, False: 0]
Branch (646:13): [True: 87, False: 0]
Branch (646:13): [True: 0, False: 0]
Branch (646:13): [True: 4, False: 0]
|
647 | 91 | sorted_awaited_action_state_for_flags(filter.stages) |
648 | | else { |
649 | 0 | let mut all_items: Vec<_> = self |
650 | 0 | .action_db |
651 | 0 | .get_all_awaited_actions() |
652 | 0 | .await |
653 | 0 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")? |
654 | 0 | .and_then(|awaited_action_subscriber| async move { |
655 | 0 | let awaited_action = awaited_action_subscriber |
656 | 0 | .borrow() |
657 | 0 | .await |
658 | 0 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?; |
659 | 0 | Ok((awaited_action_subscriber, awaited_action)) |
660 | 0 | }) |
661 | 0 | .try_filter_map(|(subscriber, awaited_action)| { |
662 | 0 | if apply_filter_predicate(&awaited_action, &filter) { Branch (662:24): [True: 0, False: 0]
Branch (662:24): [True: 0, False: 0]
Branch (662:24): [True: 0, False: 0]
Branch (662:24): [True: 0, False: 0]
Branch (662:24): [Folded - Ignored]
Branch (662:24): [True: 0, False: 0]
Branch (662:24): [True: 0, False: 0]
Branch (662:24): [True: 0, False: 0]
Branch (662:24): [True: 0, False: 0]
|
663 | 0 | future::ready(Ok(Some((subscriber, awaited_action.sort_key())))) |
664 | 0 | .left_future() |
665 | | } else { |
666 | 0 | future::ready(Result::<_, Error>::Ok(None)).right_future() |
667 | | } |
668 | 0 | }) |
669 | 0 | .try_collect() |
670 | 0 | .await |
671 | 0 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?; |
672 | 0 | match filter.order_by_priority_direction { |
673 | 0 | Some(OrderDirection::Asc) => all_items.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)), |
674 | 0 | Some(OrderDirection::Desc) => all_items.sort_unstable_by(|(_, a), (_, b)| b.cmp(a)), |
675 | 0 | None => {} |
676 | | } |
677 | 0 | return Ok(Box::pin(stream::iter( |
678 | 0 | all_items |
679 | 0 | .into_iter() |
680 | 0 | .map(move |(subscriber, _)| to_action_state_result(subscriber)), |
681 | 0 | ))); |
682 | | }; |
683 | | |
684 | 91 | let desc = matches!0 ( |
685 | 91 | filter.order_by_priority_direction, |
686 | | Some(OrderDirection::Desc) |
687 | | ); |
688 | 91 | let stream = self |
689 | 91 | .action_db |
690 | 91 | .get_range_of_actions( |
691 | 91 | sorted_awaited_action_state, |
692 | 91 | Bound::Unbounded, |
693 | 91 | Bound::Unbounded, |
694 | 91 | desc, |
695 | 91 | ) |
696 | 0 | .await |
697 | 91 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations"0 )?0 |
698 | 91 | .and_then(|awaited_action_subscriber| async move {42 |
699 | 42 | let awaited_action = awaited_action_subscriber |
700 | 42 | .borrow() |
701 | 0 | .await |
702 | 42 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations"0 )?0 ; |
703 | 42 | Ok((awaited_action_subscriber, awaited_action)) |
704 | 91 | }84 ) |
705 | 91 | .try_filter_map(move |(subscriber, awaited_action)| { |
706 | 42 | if apply_filter_predicate(&awaited_action, &filter) { Branch (706:20): [True: 0, False: 0]
Branch (706:20): [True: 0, False: 0]
Branch (706:20): [True: 0, False: 0]
Branch (706:20): [True: 0, False: 0]
Branch (706:20): [Folded - Ignored]
Branch (706:20): [True: 0, False: 0]
Branch (706:20): [True: 40, False: 0]
Branch (706:20): [True: 0, False: 0]
Branch (706:20): [True: 2, False: 0]
|
707 | 42 | future::ready(Ok(Some(subscriber))).left_future() |
708 | | } else { |
709 | 0 | future::ready(Result::<_, Error>::Ok(None)).right_future() |
710 | | } |
711 | 91 | }42 ) |
712 | 91 | .map(move |result| -> Box<dyn ActionStateResult> { |
713 | 42 | result.map_or_else( |
714 | 42 | |e| -> Box<dyn ActionStateResult> { Box::new(ErrorActionStateResult(e)) }0 , |
715 | 42 | |v| -> Box<dyn ActionStateResult> { to_action_state_result(v) }, |
716 | 42 | ) |
717 | 91 | }); |
718 | 91 | Ok(Box::pin(stream)) |
719 | 594 | } |
720 | | } |
721 | | |
722 | | #[async_trait] |
723 | | impl<T, I, NowFn> ClientStateManager for SimpleSchedulerStateManager<T, I, NowFn> |
724 | | where |
725 | | T: AwaitedActionDb, |
726 | | I: InstantWrapper, |
727 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
728 | | { |
729 | | async fn add_action( |
730 | | &self, |
731 | | client_operation_id: OperationId, |
732 | | action_info: Arc<ActionInfo>, |
733 | 25 | ) -> Result<Box<dyn ActionStateResult>, Error> { |
734 | 25 | let sub = self |
735 | 25 | .inner_add_operation(client_operation_id.clone(), action_info.clone()) |
736 | 0 | .await?; |
737 | | |
738 | 25 | Ok(Box::new(ClientActionStateResult::new( |
739 | 25 | sub, |
740 | 25 | self.weak_self.clone(), |
741 | 25 | self.no_event_action_timeout, |
742 | 25 | self.now_fn.clone(), |
743 | 25 | ))) |
744 | 50 | } |
745 | | |
746 | | async fn filter_operations<'a>( |
747 | | &'a self, |
748 | | filter: OperationFilter, |
749 | 503 | ) -> Result<ActionStateResultStream<'a>, Error> { |
750 | 503 | self.inner_filter_operations(filter, move |rx| { |
751 | 3 | Box::new(ClientActionStateResult::new( |
752 | 3 | rx, |
753 | 3 | self.weak_self.clone(), |
754 | 3 | self.no_event_action_timeout, |
755 | 3 | self.now_fn.clone(), |
756 | 3 | )) |
757 | 503 | }) |
758 | 0 | .await |
759 | 1.00k | } |
760 | | |
761 | 0 | fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> { |
762 | 0 | None |
763 | 0 | } |
764 | | } |
765 | | |
766 | | #[async_trait] |
767 | | impl<T, I, NowFn> WorkerStateManager for SimpleSchedulerStateManager<T, I, NowFn> |
768 | | where |
769 | | T: AwaitedActionDb, |
770 | | I: InstantWrapper, |
771 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
772 | | { |
773 | | async fn update_operation( |
774 | | &self, |
775 | | operation_id: &OperationId, |
776 | | worker_id: &WorkerId, |
777 | | update: UpdateOperationType, |
778 | 12 | ) -> Result<(), Error> { |
779 | 12 | self.inner_update_operation(operation_id, Some(worker_id), update) |
780 | 0 | .await |
781 | 24 | } |
782 | | } |
783 | | |
784 | | #[async_trait] |
785 | | impl<T, I, NowFn> MatchingEngineStateManager for SimpleSchedulerStateManager<T, I, NowFn> |
786 | | where |
787 | | T: AwaitedActionDb, |
788 | | I: InstantWrapper, |
789 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
790 | | { |
791 | | async fn filter_operations<'a>( |
792 | | &'a self, |
793 | | filter: OperationFilter, |
794 | 91 | ) -> Result<ActionStateResultStream<'a>, Error> { |
795 | 91 | self.inner_filter_operations(filter, |rx| { |
796 | 42 | Box::new(MatchingEngineActionStateResult::new( |
797 | 42 | rx, |
798 | 42 | self.weak_self.clone(), |
799 | 42 | self.no_event_action_timeout, |
800 | 42 | self.now_fn.clone(), |
801 | 42 | )) |
802 | 91 | }) |
803 | 0 | .await |
804 | 182 | } |
805 | | |
806 | | async fn assign_operation( |
807 | | &self, |
808 | | operation_id: &OperationId, |
809 | | worker_id_or_reason_for_unsassign: Result<&WorkerId, Error>, |
810 | 29 | ) -> Result<(), Error> { |
811 | 29 | let (maybe_worker_id, update) = match worker_id_or_reason_for_unsassign { |
812 | 28 | Ok(worker_id) => ( |
813 | 28 | Some(worker_id), |
814 | 28 | UpdateOperationType::UpdateWithActionStage(ActionStage::Executing), |
815 | 28 | ), |
816 | 1 | Err(err) => (None, UpdateOperationType::UpdateWithError(err)), |
817 | | }; |
818 | 29 | self.inner_update_operation(operation_id, maybe_worker_id, update) |
819 | 0 | .await |
820 | 58 | } |
821 | | } |