/build/source/nativelink-scheduler/src/simple_scheduler_state_manager.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::ops::Bound; |
16 | | use core::time::Duration; |
17 | | use std::string::ToString; |
18 | | use std::sync::{Arc, Weak}; |
19 | | |
20 | | use async_lock::Mutex; |
21 | | use async_trait::async_trait; |
22 | | use futures::{StreamExt, TryStreamExt, stream}; |
23 | | use nativelink_error::{Code, Error, ResultExt, make_err}; |
24 | | use nativelink_metric::MetricsComponent; |
25 | | use nativelink_util::action_messages::{ |
26 | | ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueQualifier, ExecutionMetadata, |
27 | | OperationId, WorkerId, |
28 | | }; |
29 | | use nativelink_util::instant_wrapper::InstantWrapper; |
30 | | use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider; |
31 | | use nativelink_util::metrics::{ |
32 | | EXECUTION_METRICS, EXECUTION_RESULT, EXECUTION_STAGE, ExecutionResult, ExecutionStage, |
33 | | }; |
34 | | use nativelink_util::operation_state_manager::{ |
35 | | ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager, |
36 | | OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType, WorkerStateManager, |
37 | | }; |
38 | | use nativelink_util::origin_event::OriginMetadata; |
39 | | use opentelemetry::KeyValue; |
40 | | use tracing::{debug, info, trace, warn}; |
41 | | |
42 | | use super::awaited_action_db::{ |
43 | | AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, SortedAwaitedActionState, |
44 | | }; |
45 | | use crate::worker_registry::SharedWorkerRegistry; |
46 | | |
47 | | /// Maximum number of times an update to the database |
48 | | /// can fail before giving up. |
49 | | const MAX_UPDATE_RETRIES: usize = 5; |
50 | | |
51 | | /// Base delay for exponential backoff on version conflicts (in ms). |
52 | | const BASE_RETRY_DELAY_MS: u64 = 10; |
53 | | |
54 | | /// Maximum jitter to add to retry delay (in ms). |
55 | | const MAX_RETRY_JITTER_MS: u64 = 20; |
56 | | |
57 | | /// Simple struct that implements the `ActionStateResult` trait and always returns an error. |
58 | | struct ErrorActionStateResult(Error); |
59 | | |
60 | | #[async_trait] |
61 | | impl ActionStateResult for ErrorActionStateResult { |
62 | 0 | async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> { |
63 | | Err(self.0.clone()) |
64 | 0 | } |
65 | | |
66 | 0 | async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> { |
67 | | Err(self.0.clone()) |
68 | 0 | } |
69 | | |
70 | 0 | async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> { |
71 | | Err(self.0.clone()) |
72 | 0 | } |
73 | | } |
74 | | |
75 | | struct ClientActionStateResult<U, T, I, NowFn> |
76 | | where |
77 | | U: AwaitedActionSubscriber, |
78 | | T: AwaitedActionDb, |
79 | | I: InstantWrapper, |
80 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
81 | | { |
82 | | inner: MatchingEngineActionStateResult<U, T, I, NowFn>, |
83 | | } |
84 | | |
85 | | impl<U, T, I, NowFn> ClientActionStateResult<U, T, I, NowFn> |
86 | | where |
87 | | U: AwaitedActionSubscriber, |
88 | | T: AwaitedActionDb, |
89 | | I: InstantWrapper, |
90 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
91 | | { |
92 | 536 | const fn new( |
93 | 536 | sub: U, |
94 | 536 | simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>, |
95 | 536 | no_event_action_timeout: Duration, |
96 | 536 | now_fn: NowFn, |
97 | 536 | ) -> Self { |
98 | 536 | Self { |
99 | 536 | inner: MatchingEngineActionStateResult::new( |
100 | 536 | sub, |
101 | 536 | simple_scheduler_state_manager, |
102 | 536 | no_event_action_timeout, |
103 | 536 | now_fn, |
104 | 536 | ), |
105 | 536 | } |
106 | 536 | } |
107 | | } |
108 | | |
109 | | #[async_trait] |
110 | | impl<U, T, I, NowFn> ActionStateResult for ClientActionStateResult<U, T, I, NowFn> |
111 | | where |
112 | | U: AwaitedActionSubscriber, |
113 | | T: AwaitedActionDb, |
114 | | I: InstantWrapper, |
115 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
116 | | { |
117 | 6 | async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> { |
118 | | self.inner.as_state().await |
119 | 6 | } |
120 | | |
121 | 54 | async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> { |
122 | | self.inner.changed().await |
123 | 54 | } |
124 | | |
125 | 0 | async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> { |
126 | | self.inner.as_action_info().await |
127 | 0 | } |
128 | | } |
129 | | |
130 | | struct MatchingEngineActionStateResult<U, T, I, NowFn> |
131 | | where |
132 | | U: AwaitedActionSubscriber, |
133 | | T: AwaitedActionDb, |
134 | | I: InstantWrapper, |
135 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
136 | | { |
137 | | awaited_action_sub: U, |
138 | | simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>, |
139 | | no_event_action_timeout: Duration, |
140 | | now_fn: NowFn, |
141 | | } |
142 | | impl<U, T, I, NowFn> MatchingEngineActionStateResult<U, T, I, NowFn> |
143 | | where |
144 | | U: AwaitedActionSubscriber, |
145 | | T: AwaitedActionDb, |
146 | | I: InstantWrapper, |
147 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
148 | | { |
149 | 593 | const fn new( |
150 | 593 | awaited_action_sub: U, |
151 | 593 | simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>, |
152 | 593 | no_event_action_timeout: Duration, |
153 | 593 | now_fn: NowFn, |
154 | 593 | ) -> Self { |
155 | 593 | Self { |
156 | 593 | awaited_action_sub, |
157 | 593 | simple_scheduler_state_manager, |
158 | 593 | no_event_action_timeout, |
159 | 593 | now_fn, |
160 | 593 | } |
161 | 593 | } |
162 | | } |
163 | | |
164 | | #[async_trait] |
165 | | impl<U, T, I, NowFn> ActionStateResult for MatchingEngineActionStateResult<U, T, I, NowFn> |
166 | | where |
167 | | U: AwaitedActionSubscriber, |
168 | | T: AwaitedActionDb, |
169 | | I: InstantWrapper, |
170 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
171 | | { |
172 | 44 | async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> { |
173 | | let awaited_action = self |
174 | | .awaited_action_sub |
175 | | .borrow() |
176 | | .await |
177 | | .err_tip(|| "In MatchingEngineActionStateResult::as_state")?; |
178 | | Ok(( |
179 | | awaited_action.state().clone(), |
180 | | awaited_action.maybe_origin_metadata().cloned(), |
181 | | )) |
182 | 44 | } |
183 | | |
184 | 54 | async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> { |
185 | | let mut timeout_attempts = 0; |
186 | | loop { |
187 | | tokio::select! { |
188 | | awaited_action_result = self.awaited_action_sub.changed() => { |
189 | | return awaited_action_result |
190 | | .err_tip(|| "In MatchingEngineActionStateResult::changed") |
191 | 50 | .map(|v| (v.state().clone(), v.maybe_origin_metadata().cloned())); |
192 | | } |
193 | | () = (self.now_fn)().sleep(self.no_event_action_timeout) => { |
194 | | // Timeout happened, do additional checks below. |
195 | | } |
196 | | } |
197 | | |
198 | | let awaited_action = self |
199 | | .awaited_action_sub |
200 | | .borrow() |
201 | | .await |
202 | | .err_tip(|| "In MatchingEngineActionStateResult::changed")?; |
203 | | |
204 | | if matches!(awaited_action.state().stage, ActionStage::Queued) { |
205 | | // Actions in queued state do not get periodically updated, |
206 | | // so we don't need to timeout them. |
207 | | continue; |
208 | | } |
209 | | |
210 | | let simple_scheduler_state_manager = self |
211 | | .simple_scheduler_state_manager |
212 | | .upgrade() |
213 | 0 | .err_tip(|| format!("Failed to upgrade weak reference to SimpleSchedulerStateManager in MatchingEngineActionStateResult::changed at attempt: {timeout_attempts}"))?; |
214 | | |
215 | | // Check if worker is alive via registry before timing out. |
216 | | let should_timeout = simple_scheduler_state_manager |
217 | | .should_timeout_operation(&awaited_action) |
218 | | .await; |
219 | | |
220 | | if !should_timeout { |
221 | | // Worker is alive, continue waiting for updates |
222 | | trace!( |
223 | | operation_id = %awaited_action.operation_id(), |
224 | | "Operation timeout check passed, worker is alive" |
225 | | ); |
226 | | continue; |
227 | | } |
228 | | |
229 | | warn!( |
230 | | ?awaited_action, |
231 | | "OperationId {} / {} timed out after {} seconds issuing a retry", |
232 | | awaited_action.operation_id(), |
233 | | awaited_action.state().client_operation_id, |
234 | | self.no_event_action_timeout.as_secs_f32(), |
235 | | ); |
236 | | |
237 | | simple_scheduler_state_manager |
238 | | .timeout_operation_id(awaited_action.operation_id()) |
239 | | .await |
240 | | .err_tip(|| "In MatchingEngineActionStateResult::changed")?; |
241 | | |
242 | | if timeout_attempts >= MAX_UPDATE_RETRIES { |
243 | | return Err(make_err!( |
244 | | Code::Internal, |
245 | | "Failed to update action after {} retries with no error set in MatchingEngineActionStateResult::changed - {} {:?}", |
246 | | MAX_UPDATE_RETRIES, |
247 | | awaited_action.operation_id(), |
248 | | awaited_action.state().stage, |
249 | | )); |
250 | | } |
251 | | timeout_attempts += 1; |
252 | | } |
253 | 54 | } |
254 | | |
255 | 57 | async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> { |
256 | | let awaited_action = self |
257 | | .awaited_action_sub |
258 | | .borrow() |
259 | | .await |
260 | | .err_tip(|| "In MatchingEngineActionStateResult::as_action_info")?; |
261 | | Ok(( |
262 | | awaited_action.action_info().clone(), |
263 | | awaited_action.maybe_origin_metadata().cloned(), |
264 | | )) |
265 | 57 | } |
266 | | } |
267 | | |
268 | | /// `SimpleSchedulerStateManager` is responsible for maintaining the state of the scheduler. |
269 | | /// Scheduler state includes the actions that are queued, active, and recently completed. |
270 | | /// It also includes the workers that are available to execute actions based on allocation |
271 | | /// strategy. |
272 | | #[derive(MetricsComponent, Debug)] |
273 | | pub struct SimpleSchedulerStateManager<T, I, NowFn> |
274 | | where |
275 | | T: AwaitedActionDb, |
276 | | I: InstantWrapper, |
277 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
278 | | { |
279 | | /// Database for storing the state of all actions. |
280 | | #[metric(group = "action_db")] |
281 | | action_db: T, |
282 | | |
283 | | /// Maximum number of times a job can be retried. |
284 | | // TODO(palfrey) This should be a scheduler decorator instead |
285 | | // of always having it on every SimpleScheduler. |
286 | | #[metric(help = "Maximum number of times a job can be retried")] |
287 | | max_job_retries: usize, |
288 | | |
289 | | /// Duration after which an action is considered to be timed out if |
290 | | /// no event is received. |
291 | | #[metric( |
292 | | help = "Duration after which an action is considered to be timed out if no event is received" |
293 | | )] |
294 | | no_event_action_timeout: Duration, |
295 | | |
296 | | /// Mark operation as timed out if the worker has not updated in this duration. |
297 | | /// This is used to prevent operations from being stuck in the queue forever |
298 | | /// if it is not being processed by any worker. |
299 | | client_action_timeout: Duration, |
300 | | |
301 | | /// Maximum time an action can stay in Executing state without any worker |
302 | | /// update, regardless of worker keepalive status. `Duration::ZERO` disables. |
303 | | max_executing_timeout: Duration, |
304 | | |
305 | | // A lock to ensure only one timeout operation is running at a time |
306 | | // on this service. |
307 | | timeout_operation_mux: Mutex<()>, |
308 | | |
309 | | /// Weak reference to self. |
310 | | // We use a weak reference to reduce the risk of a memory leak from |
311 | | // future changes. If this becomes some kind of performance issue, |
312 | | // we can consider using a strong reference. |
313 | | weak_self: Weak<Self>, |
314 | | |
315 | | /// Function to get the current time. |
316 | | now_fn: NowFn, |
317 | | |
318 | | /// Worker registry for checking worker liveness. |
319 | | worker_registry: Option<SharedWorkerRegistry>, |
320 | | } |
321 | | |
322 | | impl<T, I, NowFn> SimpleSchedulerStateManager<T, I, NowFn> |
323 | | where |
324 | | T: AwaitedActionDb, |
325 | | I: InstantWrapper, |
326 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
327 | | { |
328 | 28 | pub fn new( |
329 | 28 | max_job_retries: usize, |
330 | 28 | no_event_action_timeout: Duration, |
331 | 28 | client_action_timeout: Duration, |
332 | 28 | max_executing_timeout: Duration, |
333 | 28 | action_db: T, |
334 | 28 | now_fn: NowFn, |
335 | 28 | worker_registry: Option<SharedWorkerRegistry>, |
336 | 28 | ) -> Arc<Self> { |
337 | 28 | Arc::new_cyclic(|weak_self| Self { |
338 | 28 | action_db, |
339 | 28 | max_job_retries, |
340 | 28 | no_event_action_timeout, |
341 | 28 | client_action_timeout, |
342 | 28 | max_executing_timeout, |
343 | 28 | timeout_operation_mux: Mutex::new(()), |
344 | 28 | weak_self: weak_self.clone(), |
345 | 28 | now_fn, |
346 | 28 | worker_registry, |
347 | 28 | }) |
348 | 28 | } |
349 | | |
350 | 3 | pub async fn should_timeout_operation(&self, awaited_action: &AwaitedAction) -> bool { |
351 | 3 | if !matches!0 (awaited_action.state().stage, ActionStage::Executing) { |
352 | 0 | return false; |
353 | 3 | } |
354 | | |
355 | 3 | let now = (self.now_fn)().now(); |
356 | | |
357 | | // Honor the per-action `Action.timeout` from the RBE protocol as a |
358 | | // backend wall-clock deadline. Without this, the only enforcement is |
359 | | // the Bazel client's --test_timeout, which surfaces as TIMEOUT/NO |
360 | | // STATUS instead of a backend signal pointing at the worker. |
361 | 3 | let action_timeout = awaited_action.action_info().timeout; |
362 | 3 | if action_timeout > Duration::ZERO { |
363 | 3 | let executing_started_at = awaited_action.state().last_transition_timestamp; |
364 | 3 | if let Ok(elapsed) = now.duration_since(executing_started_at) |
365 | 3 | && elapsed > action_timeout |
366 | | { |
367 | 1 | return true; |
368 | 2 | } |
369 | 0 | } |
370 | | |
371 | 2 | let registry_alive = if let Some(ref worker_registry1 ) = self.worker_registry { |
372 | 1 | if let Some(worker_id) = awaited_action.worker_id() { |
373 | 1 | worker_registry |
374 | 1 | .is_worker_alive(worker_id, self.no_event_action_timeout, now) |
375 | 1 | .await |
376 | | } else { |
377 | 0 | false |
378 | | } |
379 | | } else { |
380 | 1 | false |
381 | | }; |
382 | | |
383 | 2 | if registry_alive { |
384 | 0 | if self.max_executing_timeout > Duration::ZERO { |
385 | 0 | let last_update = awaited_action.last_worker_updated_timestamp(); |
386 | 0 | if let Ok(elapsed) = now.duration_since(last_update) { |
387 | 0 | return elapsed > self.max_executing_timeout; |
388 | 0 | } |
389 | 0 | } |
390 | 0 | return false; |
391 | 2 | } |
392 | | |
393 | 2 | let worker_should_update_before = awaited_action |
394 | 2 | .last_worker_updated_timestamp() |
395 | 2 | .checked_add(self.no_event_action_timeout) |
396 | 2 | .unwrap_or(now); |
397 | | |
398 | 2 | worker_should_update_before < now |
399 | 3 | } |
400 | | |
401 | 561 | async fn apply_filter_predicate( |
402 | 561 | &self, |
403 | 561 | awaited_action: &AwaitedAction, |
404 | 561 | subscriber: &T::Subscriber, |
405 | 561 | filter: &OperationFilter, |
406 | 561 | ) -> bool { |
407 | | // Note: The caller must filter `client_operation_id`. |
408 | | |
409 | 561 | let mut maybe_reloaded_awaited_action: Option<AwaitedAction> = None; |
410 | 561 | let now = (self.now_fn)().now(); |
411 | | |
412 | | // Check if client has timed out |
413 | 561 | if awaited_action.last_client_keepalive_timestamp() + self.client_action_timeout < now { |
414 | | // This may change if the version is out of date. |
415 | 0 | let mut timed_out = true; |
416 | 0 | if !awaited_action.state().stage.is_finished() { |
417 | 0 | let mut state = awaited_action.state().as_ref().clone(); |
418 | 0 | warn!(operation_id = ?awaited_action.operation_id(), timeout_secs = self.client_action_timeout.as_secs_f32(), "Operation timed out having no more clients listening"); |
419 | 0 | state.stage = ActionStage::Completed(ActionResult { |
420 | 0 | error: Some(make_err!( |
421 | 0 | Code::DeadlineExceeded, |
422 | 0 | "Operation timed out {} seconds of having no more clients listening", |
423 | 0 | self.client_action_timeout.as_secs_f32(), |
424 | 0 | )), |
425 | 0 | ..ActionResult::default() |
426 | 0 | }); |
427 | 0 | state.last_transition_timestamp = now; |
428 | 0 | let state = Arc::new(state); |
429 | | // We may be competing with an client timestamp update, so try |
430 | | // this a few times. |
431 | 0 | for attempt in 1..=MAX_UPDATE_RETRIES { |
432 | 0 | let mut new_awaited_action = match &maybe_reloaded_awaited_action { |
433 | 0 | None => awaited_action.clone(), |
434 | 0 | Some(reloaded_awaited_action) => reloaded_awaited_action.clone(), |
435 | | }; |
436 | 0 | new_awaited_action.worker_set_state(state.clone(), (self.now_fn)().now()); |
437 | 0 | let err = match self |
438 | 0 | .action_db |
439 | 0 | .update_awaited_action(new_awaited_action) |
440 | 0 | .await |
441 | | { |
442 | 0 | Ok(()) => break, |
443 | 0 | Err(err) => err, |
444 | | }; |
445 | | // Reload from the database if the action was outdated. |
446 | 0 | let maybe_awaited_action = |
447 | 0 | if attempt == MAX_UPDATE_RETRIES || err.code != Code::Aborted { |
448 | 0 | None |
449 | | } else { |
450 | 0 | subscriber.borrow().await.ok() |
451 | | }; |
452 | 0 | if let Some(reloaded_awaited_action) = maybe_awaited_action { |
453 | 0 | maybe_reloaded_awaited_action = Some(reloaded_awaited_action); |
454 | 0 | } else { |
455 | 0 | warn!( |
456 | | "Failed to update action to timed out state after client keepalive timeout. This is ok if multiple schedulers tried to set the state at the same time: {err}", |
457 | | ); |
458 | 0 | break; |
459 | | } |
460 | | // Re-check the predicate after reload. |
461 | 0 | if maybe_reloaded_awaited_action |
462 | 0 | .as_ref() |
463 | 0 | .is_some_and(|awaited_action| { |
464 | 0 | awaited_action.last_client_keepalive_timestamp() |
465 | 0 | + self.client_action_timeout |
466 | 0 | >= (self.now_fn)().now() |
467 | 0 | }) |
468 | | { |
469 | 0 | timed_out = false; |
470 | 0 | break; |
471 | 0 | } else if maybe_reloaded_awaited_action |
472 | 0 | .as_ref() |
473 | 0 | .is_some_and(|awaited_action| awaited_action.state().stage.is_finished()) |
474 | | { |
475 | 0 | break; |
476 | 0 | } |
477 | | } |
478 | 0 | } |
479 | 0 | if timed_out { |
480 | 0 | return false; |
481 | 0 | } |
482 | 561 | } |
483 | | // If the action was reloaded, then use that for the rest of the checks |
484 | | // instead of the input parameter. |
485 | 561 | let awaited_action = maybe_reloaded_awaited_action |
486 | 561 | .as_ref() |
487 | 561 | .unwrap_or(awaited_action); |
488 | | |
489 | 561 | if let Some(operation_id0 ) = &filter.operation_id |
490 | 0 | && operation_id != awaited_action.operation_id() |
491 | | { |
492 | 0 | return false; |
493 | 561 | } |
494 | | |
495 | 561 | if filter.worker_id.is_some() && filter.worker_id.as_ref()0 != awaited_action.worker_id() { |
496 | 0 | return false; |
497 | 561 | } |
498 | | |
499 | | { |
500 | 561 | if let Some(filter_unique_key0 ) = &filter.unique_key { |
501 | 0 | match &awaited_action.action_info().unique_qualifier { |
502 | 0 | ActionUniqueQualifier::Cacheable(unique_key) => { |
503 | 0 | if filter_unique_key != unique_key { |
504 | 0 | return false; |
505 | 0 | } |
506 | | } |
507 | | ActionUniqueQualifier::Uncacheable(_) => { |
508 | 0 | return false; |
509 | | } |
510 | | } |
511 | 561 | } |
512 | 561 | if let Some(action_digest0 ) = filter.action_digest |
513 | 0 | && action_digest != awaited_action.action_info().digest() |
514 | | { |
515 | 0 | return false; |
516 | 561 | } |
517 | | } |
518 | | |
519 | | { |
520 | 561 | let last_worker_update_timestamp = awaited_action.last_worker_updated_timestamp(); |
521 | 561 | if let Some(worker_update_before0 ) = filter.worker_update_before |
522 | 0 | && worker_update_before < last_worker_update_timestamp |
523 | | { |
524 | 0 | return false; |
525 | 561 | } |
526 | 561 | if let Some(completed_before0 ) = filter.completed_before |
527 | 0 | && awaited_action.state().stage.is_finished() |
528 | 0 | && completed_before < last_worker_update_timestamp |
529 | | { |
530 | 0 | return false; |
531 | 561 | } |
532 | 561 | if filter.stages != OperationStageFlags::Any { |
533 | 557 | let stage_flag = match awaited_action.state().stage { |
534 | 0 | ActionStage::Unknown => OperationStageFlags::Any, |
535 | 0 | ActionStage::CacheCheck => OperationStageFlags::CacheCheck, |
536 | 557 | ActionStage::Queued => OperationStageFlags::Queued, |
537 | 0 | ActionStage::Executing => OperationStageFlags::Executing, |
538 | | ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => { |
539 | 0 | OperationStageFlags::Completed |
540 | | } |
541 | | }; |
542 | 557 | if !filter.stages.intersects(stage_flag) { |
543 | 0 | return false; |
544 | 557 | } |
545 | 4 | } |
546 | | } |
547 | | |
548 | 561 | true |
549 | 561 | } |
550 | | |
551 | | /// Let the scheduler know that an operation has timed out from |
552 | | /// the client side (ie: worker has not updated in a while). |
553 | 1 | async fn timeout_operation_id(&self, operation_id: &OperationId) -> Result<(), Error> { |
554 | | // Ensure that only one timeout operation is running at a time. |
555 | | // Failing to do this could result in the same operation being |
556 | | // timed out multiple times at the same time. |
557 | | // Note: We could implement this on a per-operation_id basis, but it is quite |
558 | | // complex to manage the locks. |
559 | 1 | let _lock = self.timeout_operation_mux.lock().await; |
560 | | |
561 | 1 | let awaited_action_subscriber = self |
562 | 1 | .action_db |
563 | 1 | .get_by_operation_id(operation_id) |
564 | 1 | .await |
565 | 1 | .err_tip(|| "In SimpleSchedulerStateManager::timeout_operation_id")?0 |
566 | 1 | .err_tip(|| {0 |
567 | 0 | format!("Operation id {operation_id} does not exist in SimpleSchedulerStateManager::timeout_operation_id") |
568 | 0 | })?; |
569 | | |
570 | 1 | let awaited_action = awaited_action_subscriber |
571 | 1 | .borrow() |
572 | 1 | .await |
573 | 1 | .err_tip(|| "In SimpleSchedulerStateManager::timeout_operation_id")?0 ; |
574 | | |
575 | | // If the action is not executing, we should not timeout the action. |
576 | 1 | if !matches!0 (awaited_action.state().stage, ActionStage::Executing) { |
577 | 0 | return Ok(()); |
578 | 1 | } |
579 | | |
580 | 1 | let now = (self.now_fn)().now(); |
581 | | |
582 | | // Check worker liveness via registry if available. |
583 | 1 | let registry_alive = if let Some(ref worker_registry) = self.worker_registry { |
584 | 1 | if let Some(worker_id) = awaited_action.worker_id() { |
585 | 1 | worker_registry |
586 | 1 | .is_worker_alive(worker_id, self.no_event_action_timeout, now) |
587 | 1 | .await |
588 | | } else { |
589 | 0 | false |
590 | | } |
591 | | } else { |
592 | 0 | false |
593 | | }; |
594 | | |
595 | 1 | let timestamp_alive = { |
596 | 1 | let worker_should_update_before = awaited_action |
597 | 1 | .last_worker_updated_timestamp() |
598 | 1 | .checked_add(self.no_event_action_timeout) |
599 | 1 | .unwrap_or(now); |
600 | 1 | worker_should_update_before >= now |
601 | | }; |
602 | | |
603 | 1 | if registry_alive || timestamp_alive { |
604 | 0 | trace!( |
605 | | %operation_id, |
606 | 0 | worker_id = ?awaited_action.worker_id(), |
607 | | registry_alive, |
608 | | timestamp_alive, |
609 | | "Worker is alive, operation not timed out" |
610 | | ); |
611 | 0 | return Ok(()); |
612 | 1 | } |
613 | | |
614 | 1 | warn!( |
615 | | %operation_id, |
616 | 1 | worker_id = ?awaited_action.worker_id(), |
617 | | registry_alive, |
618 | | timestamp_alive, |
619 | | "Worker not alive via registry or timestamp, timing out operation" |
620 | | ); |
621 | | |
622 | 1 | self.assign_operation( |
623 | 1 | operation_id, |
624 | 1 | Err(make_err!( |
625 | 1 | Code::DeadlineExceeded, |
626 | 1 | "Operation timed out after {} seconds", |
627 | 1 | self.no_event_action_timeout.as_secs_f32(), |
628 | 1 | )), |
629 | 1 | ) |
630 | 1 | .await |
631 | 1 | } |
632 | | |
633 | 58 | async fn inner_update_operation( |
634 | 58 | &self, |
635 | 58 | operation_id: &OperationId, |
636 | 58 | maybe_worker_id: Option<&WorkerId>, |
637 | 58 | update: UpdateOperationType, |
638 | 58 | ) -> Result<(), Error> { |
639 | 58 | let update_type_str = match &update { |
640 | 0 | UpdateOperationType::KeepAlive => "KeepAlive", |
641 | 43 | UpdateOperationType::UpdateWithActionStage(stage) => match stage { |
642 | 0 | ActionStage::Queued => "Stage:Queued", |
643 | 38 | ActionStage::Executing => "Stage:Executing", |
644 | 5 | ActionStage::Completed(_) => "Stage:Completed", |
645 | 0 | ActionStage::CompletedFromCache(_) => "Stage:CompletedFromCache", |
646 | 0 | ActionStage::CacheCheck => "Stage:CacheCheck", |
647 | 0 | ActionStage::Unknown => "Stage:Unknown", |
648 | | }, |
649 | 12 | UpdateOperationType::UpdateWithError(_) => "Error", |
650 | 2 | UpdateOperationType::UpdateWithDisconnect => "Disconnect", |
651 | 1 | UpdateOperationType::ExecutionComplete => "ExecutionComplete", |
652 | | }; |
653 | | |
654 | 58 | debug!( |
655 | | %operation_id, |
656 | | ?maybe_worker_id, |
657 | | update_type = %update_type_str, |
658 | | "inner_update_operation START" |
659 | | ); |
660 | | |
661 | 58 | let mut last_err = None; |
662 | 58 | let mut retry_count = 0; |
663 | 58 | for _ in 0..MAX_UPDATE_RETRIES { |
664 | 62 | retry_count += 1; |
665 | 62 | if retry_count > 1 { |
666 | 4 | let base_delay = BASE_RETRY_DELAY_MS * (1 << (retry_count - 2).min(4)); |
667 | 4 | let jitter = std::time::SystemTime::now() |
668 | 4 | .duration_since(std::time::UNIX_EPOCH) |
669 | 4 | .map_or(0, |d| { |
670 | 4 | u64::try_from(d.as_nanos()).expect("u64 error") % MAX_RETRY_JITTER_MS |
671 | 4 | }); |
672 | 4 | let delay = Duration::from_millis(base_delay + jitter); |
673 | | |
674 | 4 | warn!( |
675 | | %operation_id, |
676 | | ?maybe_worker_id, |
677 | | retry_count, |
678 | 4 | delay_ms = delay.as_millis(), |
679 | | update_type = %update_type_str, |
680 | | "Retrying operation update due to version conflict (with backoff)" |
681 | | ); |
682 | | |
683 | 4 | tokio::time::sleep(delay).await; |
684 | 58 | } |
685 | 62 | let maybe_awaited_action_subscriber = self |
686 | 62 | .action_db |
687 | 62 | .get_by_operation_id(operation_id) |
688 | 62 | .await |
689 | 62 | .err_tip(|| "In SimpleSchedulerStateManager::update_operation")?0 ; |
690 | 62 | let Some(awaited_action_subscriber60 ) = maybe_awaited_action_subscriber else { |
691 | | // No action found. It is ok if the action was not found. It |
692 | | // probably means that the action was dropped, but worker was |
693 | | // still processing it. |
694 | 2 | warn!( |
695 | | %operation_id, |
696 | | "Unable to update action due to it being missing, probably dropped" |
697 | | ); |
698 | 2 | return Ok(()); |
699 | | }; |
700 | | |
701 | 60 | let mut awaited_action = awaited_action_subscriber |
702 | 60 | .borrow() |
703 | 60 | .await |
704 | 60 | .err_tip(|| "In SimpleSchedulerStateManager::update_operation")?0 ; |
705 | | |
706 | | // Make sure the worker id matches the awaited action worker id. |
707 | | // This might happen if the worker sending the update is not the |
708 | | // worker that was assigned. |
709 | 60 | if awaited_action.worker_id().is_some() |
710 | 23 | && maybe_worker_id.is_some() |
711 | 22 | && maybe_worker_id != awaited_action.worker_id() |
712 | | { |
713 | | // If another worker is already assigned to the action, another |
714 | | // worker probably picked up the action. We should not update the |
715 | | // action in this case and abort this operation. |
716 | 0 | let err = make_err!( |
717 | 0 | Code::Aborted, |
718 | | "Worker ids do not match - {:?} != {:?} for {:?}", |
719 | | maybe_worker_id, |
720 | 0 | awaited_action.worker_id(), |
721 | | awaited_action, |
722 | | ); |
723 | 0 | info!( |
724 | | "Worker ids do not match - {:?} != {:?} for {:?}. This is probably due to another worker picking up the action.", |
725 | | maybe_worker_id, |
726 | 0 | awaited_action.worker_id(), |
727 | | awaited_action, |
728 | | ); |
729 | 0 | return Err(err); |
730 | 60 | } |
731 | | |
732 | | // Make sure we don't update an action that is already completed. |
733 | 60 | if awaited_action.state().stage.is_finished() { |
734 | 0 | match &update { |
735 | | UpdateOperationType::UpdateWithDisconnect | UpdateOperationType::KeepAlive => { |
736 | | // No need to error a keep-alive when it's completed, it's just |
737 | | // unnecessary log noise. |
738 | 0 | return Ok(()); |
739 | | } |
740 | | _ => { |
741 | 0 | return Err(make_err!( |
742 | 0 | Code::Internal, |
743 | 0 | "Action {operation_id} is already completed with state {:?} - maybe_worker_id: {:?}", |
744 | 0 | awaited_action.state().stage, |
745 | 0 | maybe_worker_id, |
746 | 0 | )); |
747 | | } |
748 | | } |
749 | 60 | } |
750 | | |
751 | 60 | let stage56 = match &update { |
752 | | UpdateOperationType::KeepAlive => { |
753 | 0 | awaited_action.worker_keep_alive((self.now_fn)().now()); |
754 | 0 | match self |
755 | 0 | .action_db |
756 | 0 | .update_awaited_action(awaited_action) |
757 | 0 | .await |
758 | 0 | .err_tip(|| "Failed to send KeepAlive in SimpleSchedulerStateManager::update_operation") { |
759 | | // Try again if there was a version mismatch. |
760 | 0 | Err(err) if err.code == Code::Aborted => { |
761 | 0 | last_err = Some(err); |
762 | 0 | continue; |
763 | | } |
764 | 0 | result => return result, |
765 | | } |
766 | | } |
767 | 46 | UpdateOperationType::UpdateWithActionStage(stage) => { |
768 | 46 | if stage == &ActionStage::Executing |
769 | 41 | && awaited_action.state().stage == ActionStage::Executing |
770 | | { |
771 | 4 | warn!(state = ?awaited_action.state(), "Action already assigned"); |
772 | 4 | return Err(make_err!(Code::Aborted, "Action already assigned")); |
773 | 42 | } |
774 | 42 | stage.clone() |
775 | | } |
776 | 12 | UpdateOperationType::UpdateWithError(err) => { |
777 | | // Don't count a backpressure failure as an attempt for an action. |
778 | 12 | let due_to_backpressure = err.code == Code::ResourceExhausted; |
779 | 12 | if !due_to_backpressure { |
780 | 12 | awaited_action.attempts += 1; |
781 | 12 | }0 |
782 | | |
783 | 12 | if awaited_action.attempts > self.max_job_retries { |
784 | 2 | ActionStage::Completed(ActionResult { |
785 | 2 | execution_metadata: ExecutionMetadata { |
786 | 2 | worker: maybe_worker_id.map_or_else(String::default, ToString::to_string), |
787 | 2 | ..ExecutionMetadata::default() |
788 | 2 | }, |
789 | 2 | error: Some(err.clone().merge(make_err!( |
790 | 2 | Code::Internal, |
791 | 2 | "Job cancelled because it attempted to execute too many times {} > {} times {}", |
792 | 2 | awaited_action.attempts, |
793 | 2 | self.max_job_retries, |
794 | 2 | format!("for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}"), |
795 | 2 | ))), |
796 | 2 | ..ActionResult::default() |
797 | 2 | }) |
798 | | } else { |
799 | 10 | ActionStage::Queued |
800 | | } |
801 | | } |
802 | | UpdateOperationType::UpdateWithDisconnect => { |
803 | | // A worker disconnect (e.g. OOMKill, pod eviction, network |
804 | | // drop) used to requeue without counting as an attempt, |
805 | | // which let an action that always crashes its worker loop |
806 | | // forever until the Bazel client's --test_timeout fired. |
807 | | // Count disconnects as attempts so max_job_retries caps the |
808 | | // loop and the client sees a backend-attributable error. |
809 | 2 | awaited_action.attempts += 1; |
810 | | |
811 | 2 | if awaited_action.attempts > self.max_job_retries { |
812 | 1 | ActionStage::Completed(ActionResult { |
813 | 1 | execution_metadata: ExecutionMetadata { |
814 | 1 | worker: maybe_worker_id |
815 | 1 | .map_or_else(String::default, ToString::to_string), |
816 | 1 | ..ExecutionMetadata::default() |
817 | 1 | }, |
818 | 1 | error: Some(make_err!( |
819 | 1 | Code::Internal, |
820 | 1 | "Worker disconnected repeatedly while executing this action ({} > {} attempts); the runner likely OOMKilled or the pod was evicted. {}", |
821 | 1 | awaited_action.attempts, |
822 | 1 | self.max_job_retries, |
823 | 1 | format!( |
824 | 1 | "for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}" |
825 | 1 | ), |
826 | 1 | )), |
827 | 1 | ..ActionResult::default() |
828 | 1 | }) |
829 | | } else { |
830 | 1 | ActionStage::Queued |
831 | | } |
832 | | } |
833 | | // We shouldn't get here, but we just ignore it if we do. |
834 | | UpdateOperationType::ExecutionComplete => { |
835 | 0 | warn!("inner_update_operation got an ExecutionComplete, that's unexpected."); |
836 | 0 | return Ok(()); |
837 | | } |
838 | | }; |
839 | 56 | let now = (self.now_fn)().now(); |
840 | 56 | if matches!45 (stage, ActionStage::Queued) { |
841 | 11 | // If the action is queued, we need to unset the worker id regardless of |
842 | 11 | // which worker sent the update. |
843 | 11 | awaited_action.set_worker_id(None, now); |
844 | 45 | } else { |
845 | 45 | awaited_action.set_worker_id(maybe_worker_id.cloned(), now); |
846 | 45 | } |
847 | 56 | awaited_action.worker_set_state( |
848 | 56 | Arc::new(ActionState { |
849 | 56 | stage, |
850 | 56 | // Client id is not known here, it is the responsibility of |
851 | 56 | // the the subscriber impl to replace this with the |
852 | 56 | // correct client id. |
853 | 56 | client_operation_id: operation_id.clone(), |
854 | 56 | action_digest: awaited_action.action_info().digest(), |
855 | 56 | last_transition_timestamp: now, |
856 | 56 | }), |
857 | 56 | now, |
858 | | ); |
859 | | |
860 | 56 | let update_action_result = self |
861 | 56 | .action_db |
862 | 56 | .update_awaited_action(awaited_action.clone()) |
863 | 56 | .await |
864 | 56 | .err_tip(|| "In SimpleSchedulerStateManager::update_operation"); |
865 | 56 | if let Err(err5 ) = update_action_result { |
866 | | // We use Aborted to signal that the action was not |
867 | | // updated due to the data being set was not the latest |
868 | | // but can be retried. |
869 | 5 | if err.code == Code::Aborted { |
870 | 4 | debug!( |
871 | | %operation_id, |
872 | | retry_count, |
873 | | update_type = %update_type_str, |
874 | | "Version conflict (Aborted), will retry" |
875 | | ); |
876 | 4 | last_err = Some(err); |
877 | 4 | continue; |
878 | 1 | } |
879 | 1 | warn!( |
880 | | %operation_id, |
881 | | update_type = %update_type_str, |
882 | | ?err, |
883 | | "inner_update_operation FAILED (non-retryable)" |
884 | | ); |
885 | 1 | return Err(err); |
886 | 51 | } |
887 | | |
888 | | // Record execution metrics after successful state update |
889 | 51 | let action_state = awaited_action.state(); |
890 | 51 | let instance_name = awaited_action |
891 | 51 | .action_info() |
892 | 51 | .unique_qualifier |
893 | 51 | .instance_name() |
894 | 51 | .as_str(); |
895 | 51 | let worker_id = awaited_action |
896 | 51 | .worker_id() |
897 | 51 | .map(std::string::ToString::to_string); |
898 | 51 | let priority = Some(awaited_action.action_info().priority); |
899 | | |
900 | | // Build base attributes for metrics |
901 | 51 | let mut attrs = nativelink_util::metrics::make_execution_attributes( |
902 | 51 | instance_name, |
903 | 51 | worker_id.as_deref(), |
904 | 51 | priority, |
905 | | ); |
906 | | |
907 | | // Add stage attribute |
908 | 51 | let execution_stage: ExecutionStage = (&action_state.stage).into(); |
909 | 51 | attrs.push(KeyValue::new(EXECUTION_STAGE, execution_stage)); |
910 | | |
911 | | // Record stage transition |
912 | 51 | EXECUTION_METRICS.execution_stage_transitions.add(1, &attrs); |
913 | | |
914 | | // For completed actions, record the completion count with result |
915 | 51 | match &action_state.stage { |
916 | 8 | ActionStage::Completed(action_result) => { |
917 | 8 | let result = if action_result.exit_code == 0 { |
918 | 5 | ExecutionResult::Success |
919 | | } else { |
920 | 3 | ExecutionResult::Failure |
921 | | }; |
922 | 8 | attrs.push(KeyValue::new(EXECUTION_RESULT, result)); |
923 | 8 | EXECUTION_METRICS.execution_completed_count.add(1, &attrs); |
924 | | } |
925 | 0 | ActionStage::CompletedFromCache(_) => { |
926 | 0 | attrs.push(KeyValue::new(EXECUTION_RESULT, ExecutionResult::CacheHit)); |
927 | 0 | EXECUTION_METRICS.execution_completed_count.add(1, &attrs); |
928 | 0 | } |
929 | 43 | _ => {} |
930 | | } |
931 | | |
932 | 51 | debug!( |
933 | | %operation_id, |
934 | | retry_count, |
935 | | update_type = %update_type_str, |
936 | | "inner_update_operation SUCCESS" |
937 | | ); |
938 | 51 | return Ok(()); |
939 | | } |
940 | | |
941 | 0 | warn!( |
942 | | %operation_id, |
943 | | update_type = %update_type_str, |
944 | | retry_count = MAX_UPDATE_RETRIES, |
945 | | "inner_update_operation EXHAUSTED all retries" |
946 | | ); |
947 | 0 | Err(last_err.unwrap_or_else(|| { |
948 | 0 | make_err!( |
949 | 0 | Code::Internal, |
950 | | "Failed to update action after {} retries with no error set", |
951 | | MAX_UPDATE_RETRIES, |
952 | | ) |
953 | 0 | })) |
954 | 58 | } |
955 | | |
956 | 32 | async fn inner_add_operation( |
957 | 32 | &self, |
958 | 32 | new_client_operation_id: OperationId, |
959 | 32 | action_info: Arc<ActionInfo>, |
960 | 32 | ) -> Result<T::Subscriber, Error> { |
961 | 32 | self.action_db |
962 | 32 | .add_action( |
963 | 32 | new_client_operation_id, |
964 | 32 | action_info, |
965 | 32 | self.no_event_action_timeout, |
966 | 32 | ) |
967 | 32 | .await |
968 | 32 | .err_tip(|| "In SimpleSchedulerStateManager::add_operation") |
969 | 32 | } |
970 | | |
971 | 626 | async fn inner_filter_operations<'a, F>( |
972 | 626 | &'a self, |
973 | 626 | filter: OperationFilter, |
974 | 626 | to_action_state_result: F, |
975 | 626 | ) -> Result<ActionStateResultStream<'a>, Error> |
976 | 626 | where |
977 | 626 | F: Fn(T::Subscriber) -> Box<dyn ActionStateResult> + Send + Sync + 'a, |
978 | 626 | { |
979 | 622 | const fn sorted_awaited_action_state_for_flags( |
980 | 622 | stage: OperationStageFlags, |
981 | 622 | ) -> Option<SortedAwaitedActionState> { |
982 | 622 | match stage { |
983 | 0 | OperationStageFlags::CacheCheck => Some(SortedAwaitedActionState::CacheCheck), |
984 | 620 | OperationStageFlags::Queued => Some(SortedAwaitedActionState::Queued), |
985 | 0 | OperationStageFlags::Executing => Some(SortedAwaitedActionState::Executing), |
986 | 0 | OperationStageFlags::Completed => Some(SortedAwaitedActionState::Completed), |
987 | 2 | _ => None, |
988 | | } |
989 | 622 | } |
990 | | |
991 | 626 | if let Some(operation_id0 ) = &filter.operation_id { |
992 | 0 | let maybe_subscriber = self |
993 | 0 | .action_db |
994 | 0 | .get_by_operation_id(operation_id) |
995 | 0 | .await |
996 | 0 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?; |
997 | 0 | let Some(subscriber) = maybe_subscriber else { |
998 | 0 | return Ok(Box::pin(stream::empty())); |
999 | | }; |
1000 | 0 | let awaited_action = subscriber |
1001 | 0 | .borrow() |
1002 | 0 | .await |
1003 | 0 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?; |
1004 | 0 | if !self |
1005 | 0 | .apply_filter_predicate(&awaited_action, &subscriber, &filter) |
1006 | 0 | .await |
1007 | | { |
1008 | 0 | return Ok(Box::pin(stream::empty())); |
1009 | 0 | } |
1010 | 0 | return Ok(Box::pin(stream::once(async move { |
1011 | 0 | to_action_state_result(subscriber) |
1012 | 0 | }))); |
1013 | 626 | } |
1014 | 626 | if let Some(client_operation_id4 ) = &filter.client_operation_id { |
1015 | 4 | let maybe_subscriber = self |
1016 | 4 | .action_db |
1017 | 4 | .get_awaited_action_by_id(client_operation_id) |
1018 | 4 | .await |
1019 | 4 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?0 ; |
1020 | 4 | let Some(subscriber) = maybe_subscriber else { |
1021 | 0 | return Ok(Box::pin(stream::empty())); |
1022 | | }; |
1023 | 4 | let awaited_action = subscriber |
1024 | 4 | .borrow() |
1025 | 4 | .await |
1026 | 4 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?0 ; |
1027 | 4 | if !self |
1028 | 4 | .apply_filter_predicate(&awaited_action, &subscriber, &filter) |
1029 | 4 | .await |
1030 | | { |
1031 | 0 | return Ok(Box::pin(stream::empty())); |
1032 | 4 | } |
1033 | 4 | return Ok(Box::pin(stream::once(async move { |
1034 | 4 | to_action_state_result(subscriber) |
1035 | 4 | }))); |
1036 | 622 | } |
1037 | | |
1038 | 620 | let Some(sorted_awaited_action_state) = |
1039 | 622 | sorted_awaited_action_state_for_flags(filter.stages) |
1040 | | else { |
1041 | 2 | let mut all_items: Vec<_> = self |
1042 | 2 | .action_db |
1043 | 2 | .get_all_awaited_actions() |
1044 | 2 | .await |
1045 | 2 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?0 |
1046 | 2 | .and_then(|awaited_action_subscriber| async move {0 |
1047 | 0 | let awaited_action = awaited_action_subscriber |
1048 | 0 | .borrow() |
1049 | 0 | .await |
1050 | 0 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?; |
1051 | 0 | Ok((awaited_action_subscriber, awaited_action)) |
1052 | 0 | }) |
1053 | 2 | .try_filter_map(|(subscriber, awaited_action)| {0 |
1054 | 0 | let filter = filter.clone(); |
1055 | 0 | async move { |
1056 | 0 | Ok(self |
1057 | 0 | .apply_filter_predicate(&awaited_action, &subscriber, &filter) |
1058 | 0 | .await |
1059 | 0 | .then_some((subscriber, awaited_action.sort_key()))) |
1060 | 0 | } |
1061 | 0 | }) |
1062 | 2 | .try_collect() |
1063 | 2 | .await |
1064 | 2 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?0 ; |
1065 | | |
1066 | | #[allow(clippy::unnecessary_sort_by)] |
1067 | 0 | match filter.order_by_priority_direction { |
1068 | 0 | Some(OrderDirection::Asc) => all_items.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)), |
1069 | 0 | Some(OrderDirection::Desc) => all_items.sort_unstable_by(|(_, a), (_, b)| b.cmp(a)), |
1070 | 2 | None => {} |
1071 | | } |
1072 | 2 | return Ok(Box::pin(stream::iter( |
1073 | 2 | all_items |
1074 | 2 | .into_iter() |
1075 | 2 | .map(move |(subscriber, _)| to_action_state_result0 (subscriber0 )), |
1076 | | ))); |
1077 | | }; |
1078 | | |
1079 | 620 | let desc = matches!500 ( |
1080 | 120 | filter.order_by_priority_direction, |
1081 | | Some(OrderDirection::Desc) |
1082 | | ); |
1083 | 620 | let stream = self |
1084 | 620 | .action_db |
1085 | 620 | .get_range_of_actions( |
1086 | 620 | sorted_awaited_action_state, |
1087 | 620 | Bound::Unbounded, |
1088 | 620 | Bound::Unbounded, |
1089 | 620 | desc, |
1090 | 620 | ) |
1091 | 620 | .await |
1092 | 620 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?0 |
1093 | 620 | .and_then(|awaited_action_subscriber| async move {557 |
1094 | 557 | let awaited_action = awaited_action_subscriber |
1095 | 557 | .borrow() |
1096 | 557 | .await |
1097 | 557 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?0 ; |
1098 | 557 | Ok((awaited_action_subscriber, awaited_action)) |
1099 | 1.11k | }) |
1100 | 620 | .try_filter_map(move |(subscriber, awaited_action)| {557 |
1101 | 557 | let filter = filter.clone(); |
1102 | 557 | async move { |
1103 | 557 | Ok(self |
1104 | 557 | .apply_filter_predicate(&awaited_action, &subscriber, &filter) |
1105 | 557 | .await |
1106 | 557 | .then_some(subscriber)) |
1107 | 557 | } |
1108 | 557 | }) |
1109 | 620 | .map(move |result| -> Box<dyn ActionStateResult> {557 |
1110 | 557 | result.map_or_else( |
1111 | 0 | |e| -> Box<dyn ActionStateResult> { Box::new(ErrorActionStateResult(e)) }, |
1112 | 557 | |v| -> Box<dyn ActionStateResult> { to_action_state_result(v) }, |
1113 | | ) |
1114 | 557 | }); |
1115 | 620 | Ok(Box::pin(stream)) |
1116 | 626 | } |
1117 | | } |
1118 | | |
1119 | | #[async_trait] |
1120 | | impl<T, I, NowFn> ClientStateManager for SimpleSchedulerStateManager<T, I, NowFn> |
1121 | | where |
1122 | | T: AwaitedActionDb, |
1123 | | I: InstantWrapper, |
1124 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
1125 | | { |
1126 | | async fn add_action( |
1127 | | &self, |
1128 | | client_operation_id: OperationId, |
1129 | | action_info: Arc<ActionInfo>, |
1130 | 32 | ) -> Result<Box<dyn ActionStateResult>, Error> { |
1131 | | let sub = self |
1132 | | .inner_add_operation(client_operation_id, action_info.clone()) |
1133 | | .await?; |
1134 | | |
1135 | | Ok(Box::new(ClientActionStateResult::new( |
1136 | | sub, |
1137 | | self.weak_self.clone(), |
1138 | | self.no_event_action_timeout, |
1139 | | self.now_fn.clone(), |
1140 | | ))) |
1141 | 32 | } |
1142 | | |
1143 | | async fn filter_operations<'a>( |
1144 | | &'a self, |
1145 | | filter: OperationFilter, |
1146 | 504 | ) -> Result<ActionStateResultStream<'a>, Error> { |
1147 | 504 | self.inner_filter_operations(filter, move |rx| { |
1148 | 504 | Box::new(ClientActionStateResult::new( |
1149 | 504 | rx, |
1150 | 504 | self.weak_self.clone(), |
1151 | 504 | self.no_event_action_timeout, |
1152 | 504 | self.now_fn.clone(), |
1153 | 504 | )) |
1154 | 504 | }) |
1155 | | .await |
1156 | 504 | } |
1157 | | |
1158 | 0 | fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> { |
1159 | 0 | None |
1160 | 0 | } |
1161 | | } |
1162 | | |
1163 | | #[async_trait] |
1164 | | impl<T, I, NowFn> WorkerStateManager for SimpleSchedulerStateManager<T, I, NowFn> |
1165 | | where |
1166 | | T: AwaitedActionDb, |
1167 | | I: InstantWrapper, |
1168 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
1169 | | { |
1170 | | async fn update_operation( |
1171 | | &self, |
1172 | | operation_id: &OperationId, |
1173 | | worker_id: &WorkerId, |
1174 | | update: UpdateOperationType, |
1175 | 19 | ) -> Result<(), Error> { |
1176 | | self.inner_update_operation(operation_id, Some(worker_id), update) |
1177 | | .await |
1178 | 19 | } |
1179 | | } |
1180 | | |
1181 | | #[async_trait] |
1182 | | impl<T, I, NowFn> MatchingEngineStateManager for SimpleSchedulerStateManager<T, I, NowFn> |
1183 | | where |
1184 | | T: AwaitedActionDb, |
1185 | | I: InstantWrapper, |
1186 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
1187 | | { |
1188 | | async fn filter_operations<'a>( |
1189 | | &'a self, |
1190 | | filter: OperationFilter, |
1191 | 122 | ) -> Result<ActionStateResultStream<'a>, Error> { |
1192 | 57 | self.inner_filter_operations(filter, |rx| { |
1193 | 57 | Box::new(MatchingEngineActionStateResult::new( |
1194 | 57 | rx, |
1195 | 57 | self.weak_self.clone(), |
1196 | 57 | self.no_event_action_timeout, |
1197 | 57 | self.now_fn.clone(), |
1198 | 57 | )) |
1199 | 57 | }) |
1200 | | .await |
1201 | 122 | } |
1202 | | |
1203 | | async fn assign_operation( |
1204 | | &self, |
1205 | | operation_id: &OperationId, |
1206 | | worker_id_or_reason_for_unassign: Result<&WorkerId, Error>, |
1207 | 39 | ) -> Result<(), Error> { |
1208 | | let (maybe_worker_id, update) = match worker_id_or_reason_for_unassign { |
1209 | | Ok(worker_id) => ( |
1210 | | Some(worker_id), |
1211 | | UpdateOperationType::UpdateWithActionStage(ActionStage::Executing), |
1212 | | ), |
1213 | | Err(err) => (None, UpdateOperationType::UpdateWithError(err)), |
1214 | | }; |
1215 | | self.inner_update_operation(operation_id, maybe_worker_id, update) |
1216 | | .await |
1217 | 39 | } |
1218 | | } |