/build/source/nativelink-scheduler/src/simple_scheduler_state_manager.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::ops::Bound; |
16 | | use core::time::Duration; |
17 | | use std::string::ToString; |
18 | | use std::sync::{Arc, Weak}; |
19 | | |
20 | | use async_lock::Mutex; |
21 | | use async_trait::async_trait; |
22 | | use futures::{StreamExt, TryStreamExt, stream}; |
23 | | use nativelink_error::{Code, Error, ResultExt, make_err}; |
24 | | use nativelink_metric::MetricsComponent; |
25 | | use nativelink_util::action_messages::{ |
26 | | ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueQualifier, ExecutionMetadata, |
27 | | OperationId, WorkerId, |
28 | | }; |
29 | | use nativelink_util::instant_wrapper::InstantWrapper; |
30 | | use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider; |
31 | | use nativelink_util::operation_state_manager::{ |
32 | | ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager, |
33 | | OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType, WorkerStateManager, |
34 | | }; |
35 | | use nativelink_util::origin_event::OriginMetadata; |
36 | | use tracing::{info, warn}; |
37 | | |
38 | | use super::awaited_action_db::{ |
39 | | AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, SortedAwaitedActionState, |
40 | | }; |
41 | | |
42 | | /// Maximum number of times an update to the database |
43 | | /// can fail before giving up. |
44 | | const MAX_UPDATE_RETRIES: usize = 5; |
45 | | |
46 | | /// Simple struct that implements the `ActionStateResult` trait and always returns an error. |
47 | | struct ErrorActionStateResult(Error); |
48 | | |
49 | | #[async_trait] |
50 | | impl ActionStateResult for ErrorActionStateResult { |
51 | 0 | async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> { |
52 | | Err(self.0.clone()) |
53 | 0 | } |
54 | | |
55 | 0 | async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> { |
56 | | Err(self.0.clone()) |
57 | 0 | } |
58 | | |
59 | 0 | async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> { |
60 | | Err(self.0.clone()) |
61 | 0 | } |
62 | | } |
63 | | |
64 | | struct ClientActionStateResult<U, T, I, NowFn> |
65 | | where |
66 | | U: AwaitedActionSubscriber, |
67 | | T: AwaitedActionDb, |
68 | | I: InstantWrapper, |
69 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
70 | | { |
71 | | inner: MatchingEngineActionStateResult<U, T, I, NowFn>, |
72 | | } |
73 | | |
74 | | impl<U, T, I, NowFn> ClientActionStateResult<U, T, I, NowFn> |
75 | | where |
76 | | U: AwaitedActionSubscriber, |
77 | | T: AwaitedActionDb, |
78 | | I: InstantWrapper, |
79 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
80 | | { |
81 | 534 | const fn new( |
82 | 534 | sub: U, |
83 | 534 | simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>, |
84 | 534 | no_event_action_timeout: Duration, |
85 | 534 | now_fn: NowFn, |
86 | 534 | ) -> Self { |
87 | 534 | Self { |
88 | 534 | inner: MatchingEngineActionStateResult::new( |
89 | 534 | sub, |
90 | 534 | simple_scheduler_state_manager, |
91 | 534 | no_event_action_timeout, |
92 | 534 | now_fn, |
93 | 534 | ), |
94 | 534 | } |
95 | 534 | } |
96 | | } |
97 | | |
98 | | #[async_trait] |
99 | | impl<U, T, I, NowFn> ActionStateResult for ClientActionStateResult<U, T, I, NowFn> |
100 | | where |
101 | | U: AwaitedActionSubscriber, |
102 | | T: AwaitedActionDb, |
103 | | I: InstantWrapper, |
104 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
105 | | { |
106 | 6 | async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> { |
107 | | self.inner.as_state().await |
108 | 6 | } |
109 | | |
110 | 50 | async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> { |
111 | | self.inner.changed().await |
112 | 50 | } |
113 | | |
114 | 0 | async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> { |
115 | | self.inner.as_action_info().await |
116 | 0 | } |
117 | | } |
118 | | |
119 | | struct MatchingEngineActionStateResult<U, T, I, NowFn> |
120 | | where |
121 | | U: AwaitedActionSubscriber, |
122 | | T: AwaitedActionDb, |
123 | | I: InstantWrapper, |
124 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
125 | | { |
126 | | awaited_action_sub: U, |
127 | | simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>, |
128 | | no_event_action_timeout: Duration, |
129 | | now_fn: NowFn, |
130 | | } |
131 | | impl<U, T, I, NowFn> MatchingEngineActionStateResult<U, T, I, NowFn> |
132 | | where |
133 | | U: AwaitedActionSubscriber, |
134 | | T: AwaitedActionDb, |
135 | | I: InstantWrapper, |
136 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
137 | | { |
138 | 584 | const fn new( |
139 | 584 | awaited_action_sub: U, |
140 | 584 | simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>, |
141 | 584 | no_event_action_timeout: Duration, |
142 | 584 | now_fn: NowFn, |
143 | 584 | ) -> Self { |
144 | 584 | Self { |
145 | 584 | awaited_action_sub, |
146 | 584 | simple_scheduler_state_manager, |
147 | 584 | no_event_action_timeout, |
148 | 584 | now_fn, |
149 | 584 | } |
150 | 584 | } |
151 | | } |
152 | | |
153 | | #[async_trait] |
154 | | impl<U, T, I, NowFn> ActionStateResult for MatchingEngineActionStateResult<U, T, I, NowFn> |
155 | | where |
156 | | U: AwaitedActionSubscriber, |
157 | | T: AwaitedActionDb, |
158 | | I: InstantWrapper, |
159 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
160 | | { |
161 | 38 | async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> { |
162 | | let awaited_action = self |
163 | | .awaited_action_sub |
164 | | .borrow() |
165 | | .await |
166 | | .err_tip(|| "In MatchingEngineActionStateResult::as_state")?; |
167 | | Ok(( |
168 | | awaited_action.state().clone(), |
169 | | awaited_action.maybe_origin_metadata().cloned(), |
170 | | )) |
171 | 38 | } |
172 | | |
173 | 50 | async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> { |
174 | | let mut timeout_attempts = 0; |
175 | | loop { |
176 | | tokio::select! { |
177 | | awaited_action_result = self.awaited_action_sub.changed() => { |
178 | | return awaited_action_result |
179 | | .err_tip(|| "In MatchingEngineActionStateResult::changed") |
180 | 46 | .map(|v| (v.state().clone(), v.maybe_origin_metadata().cloned())); |
181 | | } |
182 | | () = (self.now_fn)().sleep(self.no_event_action_timeout) => { |
183 | | // Timeout happened, do additional checks below. |
184 | | } |
185 | | } |
186 | | |
187 | | let awaited_action = self |
188 | | .awaited_action_sub |
189 | | .borrow() |
190 | | .await |
191 | | .err_tip(|| "In MatchingEngineActionStateResult::changed")?; |
192 | | |
193 | | if matches!(awaited_action.state().stage, ActionStage::Queued) { |
194 | | // Actions in queued state do not get periodically updated, |
195 | | // so we don't need to timeout them. |
196 | | continue; |
197 | | } |
198 | | |
199 | | let simple_scheduler_state_manager = self |
200 | | .simple_scheduler_state_manager |
201 | | .upgrade() |
202 | 0 | .err_tip(|| format!("Failed to upgrade weak reference to SimpleSchedulerStateManager in MatchingEngineActionStateResult::changed at attempt: {timeout_attempts}"))?; |
203 | | |
204 | | warn!( |
205 | | ?awaited_action, |
206 | | "OperationId {} / {} timed out after {} seconds issuing a retry", |
207 | | awaited_action.operation_id(), |
208 | | awaited_action.state().client_operation_id, |
209 | | self.no_event_action_timeout.as_secs_f32(), |
210 | | ); |
211 | | |
212 | | simple_scheduler_state_manager |
213 | | .timeout_operation_id(awaited_action.operation_id()) |
214 | | .await |
215 | | .err_tip(|| "In MatchingEngineActionStateResult::changed")?; |
216 | | |
217 | | if timeout_attempts >= MAX_UPDATE_RETRIES { |
218 | | return Err(make_err!( |
219 | | Code::Internal, |
220 | | "Failed to update action after {} retries with no error set in MatchingEngineActionStateResult::changed - {} {:?}", |
221 | | MAX_UPDATE_RETRIES, |
222 | | awaited_action.operation_id(), |
223 | | awaited_action.state().stage, |
224 | | )); |
225 | | } |
226 | | timeout_attempts += 1; |
227 | | } |
228 | 50 | } |
229 | | |
230 | 50 | async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> { |
231 | | let awaited_action = self |
232 | | .awaited_action_sub |
233 | | .borrow() |
234 | | .await |
235 | | .err_tip(|| "In MatchingEngineActionStateResult::as_action_info")?; |
236 | | Ok(( |
237 | | awaited_action.action_info().clone(), |
238 | | awaited_action.maybe_origin_metadata().cloned(), |
239 | | )) |
240 | 50 | } |
241 | | } |
242 | | |
243 | | /// `SimpleSchedulerStateManager` is responsible for maintaining the state of the scheduler. |
244 | | /// Scheduler state includes the actions that are queued, active, and recently completed. |
245 | | /// It also includes the workers that are available to execute actions based on allocation |
246 | | /// strategy. |
247 | | #[derive(MetricsComponent)] |
248 | | pub(crate) struct SimpleSchedulerStateManager<T, I, NowFn> |
249 | | where |
250 | | T: AwaitedActionDb, |
251 | | I: InstantWrapper, |
252 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
253 | | { |
254 | | /// Database for storing the state of all actions. |
255 | | #[metric(group = "action_db")] |
256 | | action_db: T, |
257 | | |
258 | | /// Maximum number of times a job can be retried. |
259 | | // TODO(palfrey) This should be a scheduler decorator instead |
260 | | // of always having it on every SimpleScheduler. |
261 | | #[metric(help = "Maximum number of times a job can be retried")] |
262 | | max_job_retries: usize, |
263 | | |
264 | | /// Duration after which an action is considered to be timed out if |
265 | | /// no event is received. |
266 | | #[metric( |
267 | | help = "Duration after which an action is considered to be timed out if no event is received" |
268 | | )] |
269 | | no_event_action_timeout: Duration, |
270 | | |
271 | | /// Mark operation as timed out if the worker has not updated in this duration. |
272 | | /// This is used to prevent operations from being stuck in the queue forever |
273 | | /// if it is not being processed by any worker. |
274 | | client_action_timeout: Duration, |
275 | | |
276 | | // A lock to ensure only one timeout operation is running at a time |
277 | | // on this service. |
278 | | timeout_operation_mux: Mutex<()>, |
279 | | |
280 | | /// Weak reference to self. |
281 | | // We use a weak reference to reduce the risk of a memory leak from |
282 | | // future changes. If this becomes some kind of performance issue, |
283 | | // we can consider using a strong reference. |
284 | | weak_self: Weak<Self>, |
285 | | |
286 | | /// Function to get the current time. |
287 | | now_fn: NowFn, |
288 | | } |
289 | | |
290 | | impl<T, I, NowFn> SimpleSchedulerStateManager<T, I, NowFn> |
291 | | where |
292 | | T: AwaitedActionDb, |
293 | | I: InstantWrapper, |
294 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
295 | | { |
296 | 23 | pub(crate) fn new( |
297 | 23 | max_job_retries: usize, |
298 | 23 | no_event_action_timeout: Duration, |
299 | 23 | client_action_timeout: Duration, |
300 | 23 | action_db: T, |
301 | 23 | now_fn: NowFn, |
302 | 23 | ) -> Arc<Self> { |
303 | 23 | Arc::new_cyclic(|weak_self| Self { |
304 | 23 | action_db, |
305 | 23 | max_job_retries, |
306 | 23 | no_event_action_timeout, |
307 | 23 | client_action_timeout, |
308 | 23 | timeout_operation_mux: Mutex::new(()), |
309 | 23 | weak_self: weak_self.clone(), |
310 | 23 | now_fn, |
311 | 23 | }) |
312 | 23 | } |
313 | | |
314 | 554 | async fn apply_filter_predicate( |
315 | 554 | &self, |
316 | 554 | awaited_action: &AwaitedAction, |
317 | 554 | subscriber: &T::Subscriber, |
318 | 554 | filter: &OperationFilter, |
319 | 554 | ) -> bool { |
320 | | // Note: The caller must filter `client_operation_id`. |
321 | | |
322 | 554 | let mut maybe_reloaded_awaited_action: Option<AwaitedAction> = None; |
323 | 554 | if awaited_action.last_client_keepalive_timestamp() + self.client_action_timeout Branch (323:12): [True: 0, False: 0]
Branch (323:12): [True: 0, False: 0]
Branch (323:12): [Folded - Ignored]
Branch (323:12): [True: 0, False: 6]
Branch (323:12): [True: 0, False: 546]
Branch (323:12): [True: 0, False: 2]
|
324 | 554 | < (self.now_fn)().now() |
325 | | { |
326 | | // This may change if the version is out of date. |
327 | 0 | let mut timed_out = true; |
328 | 0 | if !awaited_action.state().stage.is_finished() { Branch (328:16): [True: 0, False: 0]
Branch (328:16): [True: 0, False: 0]
Branch (328:16): [Folded - Ignored]
Branch (328:16): [True: 0, False: 0]
Branch (328:16): [True: 0, False: 0]
Branch (328:16): [True: 0, False: 0]
|
329 | 0 | let mut state = awaited_action.state().as_ref().clone(); |
330 | 0 | state.stage = ActionStage::Completed(ActionResult { |
331 | 0 | error: Some(make_err!( |
332 | 0 | Code::DeadlineExceeded, |
333 | 0 | "Operation timed out {} seconds of having no more clients listening", |
334 | 0 | self.client_action_timeout.as_secs_f32(), |
335 | 0 | )), |
336 | 0 | ..ActionResult::default() |
337 | 0 | }); |
338 | 0 | let state = Arc::new(state); |
339 | | // We may be competing with an client timestamp update, so try |
340 | | // this a few times. |
341 | 0 | for attempt in 1..=MAX_UPDATE_RETRIES { |
342 | 0 | let mut new_awaited_action = match &maybe_reloaded_awaited_action { |
343 | 0 | None => awaited_action.clone(), |
344 | 0 | Some(reloaded_awaited_action) => reloaded_awaited_action.clone(), |
345 | | }; |
346 | 0 | new_awaited_action.worker_set_state(state.clone(), (self.now_fn)().now()); |
347 | 0 | let err = match self |
348 | 0 | .action_db |
349 | 0 | .update_awaited_action(new_awaited_action) |
350 | 0 | .await |
351 | | { |
352 | 0 | Ok(()) => break, |
353 | 0 | Err(err) => err, |
354 | | }; |
355 | | // Reload from the database if the action was outdated. |
356 | 0 | let maybe_awaited_action = |
357 | 0 | if attempt == MAX_UPDATE_RETRIES || err.code != Code::Aborted { Branch (357:28): [True: 0, False: 0]
Branch (357:61): [True: 0, False: 0]
Branch (357:28): [True: 0, False: 0]
Branch (357:61): [True: 0, False: 0]
Branch (357:28): [Folded - Ignored]
Branch (357:61): [Folded - Ignored]
Branch (357:28): [True: 0, False: 0]
Branch (357:61): [True: 0, False: 0]
Branch (357:28): [True: 0, False: 0]
Branch (357:61): [True: 0, False: 0]
Branch (357:28): [True: 0, False: 0]
Branch (357:61): [True: 0, False: 0]
|
358 | 0 | None |
359 | | } else { |
360 | 0 | subscriber.borrow().await.ok() |
361 | | }; |
362 | 0 | if let Some(reloaded_awaited_action) = maybe_awaited_action { Branch (362:28): [True: 0, False: 0]
Branch (362:28): [True: 0, False: 0]
Branch (362:28): [Folded - Ignored]
Branch (362:28): [True: 0, False: 0]
Branch (362:28): [True: 0, False: 0]
Branch (362:28): [True: 0, False: 0]
|
363 | 0 | maybe_reloaded_awaited_action = Some(reloaded_awaited_action); |
364 | 0 | } else { |
365 | 0 | warn!( |
366 | 0 | "Failed to update action to timed out state after client keepalive timeout. This is ok if multiple schedulers tried to set the state at the same time: {err}", |
367 | | ); |
368 | 0 | break; |
369 | | } |
370 | | // Re-check the predicate after reload. |
371 | 0 | if maybe_reloaded_awaited_action Branch (371:24): [True: 0, False: 0]
Branch (371:24): [True: 0, False: 0]
Branch (371:24): [Folded - Ignored]
Branch (371:24): [True: 0, False: 0]
Branch (371:24): [True: 0, False: 0]
Branch (371:24): [True: 0, False: 0]
|
372 | 0 | .as_ref() |
373 | 0 | .is_some_and(|awaited_action| { |
374 | 0 | awaited_action.last_client_keepalive_timestamp() |
375 | 0 | + self.client_action_timeout |
376 | 0 | >= (self.now_fn)().now() |
377 | 0 | }) |
378 | | { |
379 | 0 | timed_out = false; |
380 | 0 | break; |
381 | 0 | } else if maybe_reloaded_awaited_action Branch (381:31): [True: 0, False: 0]
Branch (381:31): [True: 0, False: 0]
Branch (381:31): [Folded - Ignored]
Branch (381:31): [True: 0, False: 0]
Branch (381:31): [True: 0, False: 0]
Branch (381:31): [True: 0, False: 0]
|
382 | 0 | .as_ref() |
383 | 0 | .is_some_and(|awaited_action| awaited_action.state().stage.is_finished()) |
384 | | { |
385 | 0 | break; |
386 | 0 | } |
387 | | } |
388 | 0 | } |
389 | 0 | if timed_out { Branch (389:16): [True: 0, False: 0]
Branch (389:16): [True: 0, False: 0]
Branch (389:16): [Folded - Ignored]
Branch (389:16): [True: 0, False: 0]
Branch (389:16): [True: 0, False: 0]
Branch (389:16): [True: 0, False: 0]
|
390 | 0 | return false; |
391 | 0 | } |
392 | 554 | } |
393 | | // If the action was reloaded, then use that for the rest of the checks |
394 | | // instead of the input parameter. |
395 | 554 | let awaited_action = maybe_reloaded_awaited_action |
396 | 554 | .as_ref() |
397 | 554 | .unwrap_or(awaited_action); |
398 | | |
399 | 554 | if let Some(operation_id0 ) = &filter.operation_id { Branch (399:16): [True: 0, False: 0]
Branch (399:16): [True: 0, False: 0]
Branch (399:16): [Folded - Ignored]
Branch (399:16): [True: 0, False: 6]
Branch (399:16): [True: 0, False: 546]
Branch (399:16): [True: 0, False: 2]
|
400 | 0 | if operation_id != awaited_action.operation_id() { Branch (400:16): [True: 0, False: 0]
Branch (400:16): [True: 0, False: 0]
Branch (400:16): [Folded - Ignored]
Branch (400:16): [True: 0, False: 0]
Branch (400:16): [True: 0, False: 0]
Branch (400:16): [True: 0, False: 0]
|
401 | 0 | return false; |
402 | 0 | } |
403 | 554 | } |
404 | | |
405 | 554 | if filter.worker_id.is_some() && filter.worker_id.as_ref()0 != awaited_action.worker_id() { Branch (405:12): [True: 0, False: 0]
Branch (405:42): [True: 0, False: 0]
Branch (405:12): [True: 0, False: 0]
Branch (405:42): [True: 0, False: 0]
Branch (405:12): [Folded - Ignored]
Branch (405:42): [Folded - Ignored]
Branch (405:12): [True: 0, False: 6]
Branch (405:42): [True: 0, False: 0]
Branch (405:12): [True: 0, False: 546]
Branch (405:42): [True: 0, False: 0]
Branch (405:12): [True: 0, False: 2]
Branch (405:42): [True: 0, False: 0]
|
406 | 0 | return false; |
407 | 554 | } |
408 | | |
409 | | { |
410 | 554 | if let Some(filter_unique_key0 ) = &filter.unique_key { Branch (410:20): [True: 0, False: 0]
Branch (410:20): [True: 0, False: 0]
Branch (410:20): [Folded - Ignored]
Branch (410:20): [True: 0, False: 6]
Branch (410:20): [True: 0, False: 546]
Branch (410:20): [True: 0, False: 2]
|
411 | 0 | match &awaited_action.action_info().unique_qualifier { |
412 | 0 | ActionUniqueQualifier::Cacheable(unique_key) => { |
413 | 0 | if filter_unique_key != unique_key { Branch (413:28): [True: 0, False: 0]
Branch (413:28): [True: 0, False: 0]
Branch (413:28): [Folded - Ignored]
Branch (413:28): [True: 0, False: 0]
Branch (413:28): [True: 0, False: 0]
Branch (413:28): [True: 0, False: 0]
|
414 | 0 | return false; |
415 | 0 | } |
416 | | } |
417 | | ActionUniqueQualifier::Uncacheable(_) => { |
418 | 0 | return false; |
419 | | } |
420 | | } |
421 | 554 | } |
422 | 554 | if let Some(action_digest0 ) = filter.action_digest { Branch (422:20): [True: 0, False: 0]
Branch (422:20): [True: 0, False: 0]
Branch (422:20): [Folded - Ignored]
Branch (422:20): [True: 0, False: 6]
Branch (422:20): [True: 0, False: 546]
Branch (422:20): [True: 0, False: 2]
|
423 | 0 | if action_digest != awaited_action.action_info().digest() { Branch (423:20): [True: 0, False: 0]
Branch (423:20): [True: 0, False: 0]
Branch (423:20): [Folded - Ignored]
Branch (423:20): [True: 0, False: 0]
Branch (423:20): [True: 0, False: 0]
Branch (423:20): [True: 0, False: 0]
|
424 | 0 | return false; |
425 | 0 | } |
426 | 554 | } |
427 | | } |
428 | | |
429 | | { |
430 | 554 | let last_worker_update_timestamp = awaited_action.last_worker_updated_timestamp(); |
431 | 554 | if let Some(worker_update_before0 ) = filter.worker_update_before { Branch (431:20): [True: 0, False: 0]
Branch (431:20): [True: 0, False: 0]
Branch (431:20): [Folded - Ignored]
Branch (431:20): [True: 0, False: 6]
Branch (431:20): [True: 0, False: 546]
Branch (431:20): [True: 0, False: 2]
|
432 | 0 | if worker_update_before < last_worker_update_timestamp { Branch (432:20): [True: 0, False: 0]
Branch (432:20): [True: 0, False: 0]
Branch (432:20): [Folded - Ignored]
Branch (432:20): [True: 0, False: 0]
Branch (432:20): [True: 0, False: 0]
Branch (432:20): [True: 0, False: 0]
|
433 | 0 | return false; |
434 | 0 | } |
435 | 554 | } |
436 | 554 | if let Some(completed_before0 ) = filter.completed_before { Branch (436:20): [True: 0, False: 0]
Branch (436:20): [True: 0, False: 0]
Branch (436:20): [Folded - Ignored]
Branch (436:20): [True: 0, False: 6]
Branch (436:20): [True: 0, False: 546]
Branch (436:20): [True: 0, False: 2]
|
437 | 0 | if awaited_action.state().stage.is_finished() Branch (437:20): [True: 0, False: 0]
Branch (437:20): [True: 0, False: 0]
Branch (437:20): [Folded - Ignored]
Branch (437:20): [True: 0, False: 0]
Branch (437:20): [True: 0, False: 0]
Branch (437:20): [True: 0, False: 0]
|
438 | 0 | && completed_before < last_worker_update_timestamp Branch (438:24): [True: 0, False: 0]
Branch (438:24): [True: 0, False: 0]
Branch (438:24): [Folded - Ignored]
Branch (438:24): [True: 0, False: 0]
Branch (438:24): [True: 0, False: 0]
Branch (438:24): [True: 0, False: 0]
|
439 | | { |
440 | 0 | return false; |
441 | 0 | } |
442 | 554 | } |
443 | 554 | if filter.stages != OperationStageFlags::Any { Branch (443:16): [True: 0, False: 0]
Branch (443:16): [True: 0, False: 0]
Branch (443:16): [Folded - Ignored]
Branch (443:16): [True: 5, False: 1]
Branch (443:16): [True: 543, False: 3]
Branch (443:16): [True: 2, False: 0]
|
444 | 550 | let stage_flag = match awaited_action.state().stage { |
445 | 0 | ActionStage::Unknown => OperationStageFlags::Any, |
446 | 0 | ActionStage::CacheCheck => OperationStageFlags::CacheCheck, |
447 | 550 | ActionStage::Queued => OperationStageFlags::Queued, |
448 | 0 | ActionStage::Executing => OperationStageFlags::Executing, |
449 | | ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => { |
450 | 0 | OperationStageFlags::Completed |
451 | | } |
452 | | }; |
453 | 550 | if !filter.stages.intersects(stage_flag) { Branch (453:20): [True: 0, False: 0]
Branch (453:20): [True: 0, False: 0]
Branch (453:20): [Folded - Ignored]
Branch (453:20): [True: 0, False: 5]
Branch (453:20): [True: 0, False: 543]
Branch (453:20): [True: 0, False: 2]
|
454 | 0 | return false; |
455 | 550 | } |
456 | 4 | } |
457 | | } |
458 | | |
459 | 554 | true |
460 | 554 | } |
461 | | |
462 | | /// Let the scheduler know that an operation has timed out from |
463 | | /// the client side (ie: worker has not updated in a while). |
464 | 1 | async fn timeout_operation_id(&self, operation_id: &OperationId) -> Result<(), Error> { |
465 | | // Ensure that only one timeout operation is running at a time. |
466 | | // Failing to do this could result in the same operation being |
467 | | // timed out multiple times at the same time. |
468 | | // Note: We could implement this on a per-operation_id basis, but it is quite |
469 | | // complex to manage the locks. |
470 | 1 | let _lock = self.timeout_operation_mux.lock().await; |
471 | | |
472 | 1 | let awaited_action_subscriber = self |
473 | 1 | .action_db |
474 | 1 | .get_by_operation_id(operation_id) |
475 | 1 | .await |
476 | 1 | .err_tip(|| "In SimpleSchedulerStateManager::timeout_operation_id")?0 |
477 | 1 | .err_tip(|| {0 |
478 | 0 | format!("Operation id {operation_id} does not exist in SimpleSchedulerStateManager::timeout_operation_id") |
479 | 0 | })?; |
480 | | |
481 | 1 | let awaited_action = awaited_action_subscriber |
482 | 1 | .borrow() |
483 | 1 | .await |
484 | 1 | .err_tip(|| "In SimpleSchedulerStateManager::timeout_operation_id")?0 ; |
485 | | |
486 | | // If the action is not executing, we should not timeout the action. |
487 | 1 | if !matches!0 (awaited_action.state().stage, ActionStage::Executing) { Branch (487:12): [True: 0, False: 0]
Branch (487:12): [True: 0, False: 0]
Branch (487:12): [Folded - Ignored]
Branch (487:12): [True: 0, False: 0]
Branch (487:12): [True: 0, False: 1]
Branch (487:12): [True: 0, False: 0]
|
488 | 0 | return Ok(()); |
489 | 1 | } |
490 | | |
491 | 1 | let worker_should_update_before = awaited_action |
492 | 1 | .last_worker_updated_timestamp() |
493 | 1 | .checked_add(self.no_event_action_timeout) |
494 | 1 | .ok_or_else(|| {0 |
495 | 0 | make_err!( |
496 | 0 | Code::Internal, |
497 | | "Timestamp overflow for operation {operation_id} in SimpleSchedulerStateManager::timeout_operation_id" |
498 | | ) |
499 | 0 | })?; |
500 | 1 | if worker_should_update_before >= (self.now_fn)().now() { Branch (500:12): [True: 0, False: 0]
Branch (500:12): [True: 0, False: 0]
Branch (500:12): [Folded - Ignored]
Branch (500:12): [True: 0, False: 0]
Branch (500:12): [True: 0, False: 1]
Branch (500:12): [True: 0, False: 0]
|
501 | | // The action was updated recently, we should not timeout the action. |
502 | | // This is to prevent timing out actions that have recently been updated |
503 | | // (like multiple clients timeout the same action at the same time). |
504 | 0 | return Ok(()); |
505 | 1 | } |
506 | | |
507 | 1 | self.assign_operation( |
508 | 1 | operation_id, |
509 | 1 | Err(make_err!( |
510 | 1 | Code::DeadlineExceeded, |
511 | 1 | "Operation timed out after {} seconds", |
512 | 1 | self.no_event_action_timeout.as_secs_f32(), |
513 | 1 | )), |
514 | 1 | ) |
515 | 1 | .await |
516 | 1 | } |
517 | | |
518 | 49 | async fn inner_update_operation( |
519 | 49 | &self, |
520 | 49 | operation_id: &OperationId, |
521 | 49 | maybe_worker_id: Option<&WorkerId>, |
522 | 49 | update: UpdateOperationType, |
523 | 49 | ) -> Result<(), Error> { |
524 | 49 | let mut last_err = None; |
525 | 50 | for _ in 0..MAX_UPDATE_RETRIES { |
526 | 50 | let maybe_awaited_action_subscriber = self |
527 | 50 | .action_db |
528 | 50 | .get_by_operation_id(operation_id) |
529 | 50 | .await |
530 | 50 | .err_tip(|| "In SimpleSchedulerStateManager::update_operation")?0 ; |
531 | 50 | let Some(awaited_action_subscriber49 ) = maybe_awaited_action_subscriber else { Branch (531:17): [True: 0, False: 0]
Branch (531:17): [True: 0, False: 0]
Branch (531:17): [Folded - Ignored]
Branch (531:17): [True: 8, False: 0]
Branch (531:17): [True: 39, False: 0]
Branch (531:17): [True: 2, False: 1]
|
532 | | // No action found. It is ok if the action was not found. It |
533 | | // probably means that the action was dropped, but worker was |
534 | | // still processing it. |
535 | 1 | return Ok(()); |
536 | | }; |
537 | | |
538 | 49 | let mut awaited_action = awaited_action_subscriber |
539 | 49 | .borrow() |
540 | 49 | .await |
541 | 49 | .err_tip(|| "In SimpleSchedulerStateManager::update_operation")?0 ; |
542 | | |
543 | | // Make sure the worker id matches the awaited action worker id. |
544 | | // This might happen if the worker sending the update is not the |
545 | | // worker that was assigned. |
546 | 49 | if awaited_action.worker_id().is_some() Branch (546:16): [True: 0, False: 0]
Branch (546:16): [True: 0, False: 0]
Branch (546:16): [Folded - Ignored]
Branch (546:16): [True: 4, False: 4]
Branch (546:16): [True: 13, False: 26]
Branch (546:16): [True: 0, False: 2]
|
547 | 17 | && maybe_worker_id.is_some() Branch (547:20): [True: 0, False: 0]
Branch (547:20): [True: 0, False: 0]
Branch (547:20): [Folded - Ignored]
Branch (547:20): [True: 4, False: 0]
Branch (547:20): [True: 12, False: 1]
Branch (547:20): [True: 0, False: 0]
|
548 | 16 | && maybe_worker_id != awaited_action.worker_id() Branch (548:20): [True: 0, False: 0]
Branch (548:20): [True: 0, False: 0]
Branch (548:20): [Folded - Ignored]
Branch (548:20): [True: 0, False: 4]
Branch (548:20): [True: 0, False: 12]
Branch (548:20): [True: 0, False: 0]
|
549 | | { |
550 | | // If another worker is already assigned to the action, another |
551 | | // worker probably picked up the action. We should not update the |
552 | | // action in this case and abort this operation. |
553 | 0 | let err = make_err!( |
554 | 0 | Code::Aborted, |
555 | | "Worker ids do not match - {:?} != {:?} for {:?}", |
556 | | maybe_worker_id, |
557 | 0 | awaited_action.worker_id(), |
558 | | awaited_action, |
559 | | ); |
560 | 0 | info!( |
561 | 0 | "Worker ids do not match - {:?} != {:?} for {:?}. This is probably due to another worker picking up the action.", |
562 | | maybe_worker_id, |
563 | 0 | awaited_action.worker_id(), |
564 | | awaited_action, |
565 | | ); |
566 | 0 | return Err(err); |
567 | 49 | } |
568 | | |
569 | | // Make sure we don't update an action that is already completed. |
570 | 49 | if awaited_action.state().stage.is_finished() { Branch (570:16): [True: 0, False: 0]
Branch (570:16): [True: 0, False: 0]
Branch (570:16): [Folded - Ignored]
Branch (570:16): [True: 0, False: 8]
Branch (570:16): [True: 0, False: 39]
Branch (570:16): [True: 0, False: 2]
|
571 | 0 | match &update { |
572 | | UpdateOperationType::UpdateWithDisconnect | UpdateOperationType::KeepAlive => { |
573 | | // No need to error a keep-alive when it's completed, it's just |
574 | | // unnecessary log noise. |
575 | 0 | return Ok(()); |
576 | | } |
577 | | _ => { |
578 | 0 | return Err(make_err!( |
579 | 0 | Code::Internal, |
580 | 0 | "Action {operation_id:?} is already completed with state {:?} - maybe_worker_id: {:?}", |
581 | 0 | awaited_action.state().stage, |
582 | 0 | maybe_worker_id, |
583 | 0 | )); |
584 | | } |
585 | | } |
586 | 49 | } |
587 | | |
588 | 49 | let stage = match &update { |
589 | | UpdateOperationType::KeepAlive => { |
590 | 0 | awaited_action.worker_keep_alive((self.now_fn)().now()); |
591 | 0 | match self |
592 | 0 | .action_db |
593 | 0 | .update_awaited_action(awaited_action) |
594 | 0 | .await |
595 | 0 | .err_tip(|| "Failed to send KeepAlive in SimpleSchedulerStateManager::update_operation") { |
596 | | // Try again if there was a version mismatch. |
597 | 0 | Err(err) if err.code == Code::Aborted => { Branch (597:37): [True: 0, False: 0]
Branch (597:37): [True: 0, False: 0]
Branch (597:37): [Folded - Ignored]
Branch (597:37): [True: 0, False: 0]
Branch (597:37): [True: 0, False: 0]
Branch (597:37): [True: 0, False: 0]
|
598 | 0 | last_err = Some(err); |
599 | 0 | continue; |
600 | | } |
601 | 0 | result => return result, |
602 | | } |
603 | | } |
604 | 37 | UpdateOperationType::UpdateWithActionStage(stage) => stage.clone(), |
605 | 12 | UpdateOperationType::UpdateWithError(err) => { |
606 | | // Don't count a backpressure failure as an attempt for an action. |
607 | 12 | let due_to_backpressure = err.code == Code::ResourceExhausted; |
608 | 12 | if !due_to_backpressure { Branch (608:24): [True: 0, False: 0]
Branch (608:24): [True: 0, False: 0]
Branch (608:24): [Folded - Ignored]
Branch (608:24): [True: 4, False: 0]
Branch (608:24): [True: 8, False: 0]
Branch (608:24): [True: 0, False: 0]
|
609 | 12 | awaited_action.attempts += 1; |
610 | 12 | }0 |
611 | | |
612 | 12 | if awaited_action.attempts > self.max_job_retries { Branch (612:24): [True: 0, False: 0]
Branch (612:24): [True: 0, False: 0]
Branch (612:24): [Folded - Ignored]
Branch (612:24): [True: 1, False: 3]
Branch (612:24): [True: 1, False: 7]
Branch (612:24): [True: 0, False: 0]
|
613 | 2 | ActionStage::Completed(ActionResult { |
614 | 2 | execution_metadata: ExecutionMetadata { |
615 | 2 | worker: maybe_worker_id.map_or_else(String::default, ToString::to_string), |
616 | 2 | ..ExecutionMetadata::default() |
617 | 2 | }, |
618 | 2 | error: Some(err.clone().merge(make_err!( |
619 | 2 | Code::Internal, |
620 | 2 | "Job cancelled because it attempted to execute too many times {} > {} times {}", |
621 | 2 | awaited_action.attempts, |
622 | 2 | self.max_job_retries, |
623 | 2 | format!("for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}"), |
624 | 2 | ))), |
625 | 2 | ..ActionResult::default() |
626 | 2 | }) |
627 | | } else { |
628 | 10 | ActionStage::Queued |
629 | | } |
630 | | } |
631 | 0 | UpdateOperationType::UpdateWithDisconnect => ActionStage::Queued, |
632 | | // We shouldn't get here, but we just ignore it if we do. |
633 | | UpdateOperationType::ExecutionComplete => { |
634 | 0 | warn!("inner_update_operation got an ExecutionComplete, that's unexpected."); |
635 | 0 | return Ok(()); |
636 | | } |
637 | | }; |
638 | 49 | let now = (self.now_fn)().now(); |
639 | 49 | if matches!39 (stage, ActionStage::Queued) { |
640 | 10 | // If the action is queued, we need to unset the worker id regardless of |
641 | 10 | // which worker sent the update. |
642 | 10 | awaited_action.set_worker_id(None, now); |
643 | 39 | } else { |
644 | 39 | awaited_action.set_worker_id(maybe_worker_id.cloned(), now); |
645 | 39 | } |
646 | 49 | awaited_action.worker_set_state( |
647 | 49 | Arc::new(ActionState { |
648 | 49 | stage, |
649 | 49 | // Client id is not known here, it is the responsibility of |
650 | 49 | // the the subscriber impl to replace this with the |
651 | 49 | // correct client id. |
652 | 49 | client_operation_id: operation_id.clone(), |
653 | 49 | action_digest: awaited_action.action_info().digest(), |
654 | 49 | }), |
655 | 49 | now, |
656 | | ); |
657 | | |
658 | 49 | let update_action_result = self |
659 | 49 | .action_db |
660 | 49 | .update_awaited_action(awaited_action) |
661 | 49 | .await |
662 | 49 | .err_tip(|| "In SimpleSchedulerStateManager::update_operation"); |
663 | 49 | if let Err(err2 ) = update_action_result { Branch (663:20): [True: 0, False: 0]
Branch (663:20): [True: 0, False: 0]
Branch (663:20): [Folded - Ignored]
Branch (663:20): [True: 0, False: 8]
Branch (663:20): [True: 0, False: 39]
Branch (663:20): [True: 2, False: 0]
|
664 | | // We use Aborted to signal that the action was not |
665 | | // updated due to the data being set was not the latest |
666 | | // but can be retried. |
667 | 2 | if err.code == Code::Aborted { Branch (667:20): [True: 0, False: 0]
Branch (667:20): [True: 0, False: 0]
Branch (667:20): [Folded - Ignored]
Branch (667:20): [True: 0, False: 0]
Branch (667:20): [True: 0, False: 0]
Branch (667:20): [True: 1, False: 1]
|
668 | 1 | last_err = Some(err); |
669 | 1 | continue; |
670 | 1 | } |
671 | 1 | return Err(err); |
672 | 47 | } |
673 | 47 | return Ok(()); |
674 | | } |
675 | 0 | Err(last_err.unwrap_or_else(|| { |
676 | 0 | make_err!( |
677 | 0 | Code::Internal, |
678 | | "Failed to update action after {} retries with no error set", |
679 | | MAX_UPDATE_RETRIES, |
680 | | ) |
681 | 0 | })) |
682 | 49 | } |
683 | | |
684 | 30 | async fn inner_add_operation( |
685 | 30 | &self, |
686 | 30 | new_client_operation_id: OperationId, |
687 | 30 | action_info: Arc<ActionInfo>, |
688 | 30 | ) -> Result<T::Subscriber, Error> { |
689 | 30 | self.action_db |
690 | 30 | .add_action( |
691 | 30 | new_client_operation_id, |
692 | 30 | action_info, |
693 | 30 | self.no_event_action_timeout, |
694 | 30 | ) |
695 | 30 | .await |
696 | 30 | .err_tip(|| "In SimpleSchedulerStateManager::add_operation") |
697 | 30 | } |
698 | | |
699 | 615 | async fn inner_filter_operations<'a, F>( |
700 | 615 | &'a self, |
701 | 615 | filter: OperationFilter, |
702 | 615 | to_action_state_result: F, |
703 | 615 | ) -> Result<ActionStateResultStream<'a>, Error> |
704 | 615 | where |
705 | 615 | F: Fn(T::Subscriber) -> Box<dyn ActionStateResult> + Send + Sync + 'a, |
706 | 615 | { |
707 | 611 | const fn sorted_awaited_action_state_for_flags( |
708 | 611 | stage: OperationStageFlags, |
709 | 611 | ) -> Option<SortedAwaitedActionState> { |
710 | 611 | match stage { |
711 | 0 | OperationStageFlags::CacheCheck => Some(SortedAwaitedActionState::CacheCheck), |
712 | 611 | OperationStageFlags::Queued => Some(SortedAwaitedActionState::Queued), |
713 | 0 | OperationStageFlags::Executing => Some(SortedAwaitedActionState::Executing), |
714 | 0 | OperationStageFlags::Completed => Some(SortedAwaitedActionState::Completed), |
715 | 0 | _ => None, |
716 | | } |
717 | 611 | } |
718 | | |
719 | 615 | if let Some(operation_id0 ) = &filter.operation_id { Branch (719:16): [True: 0, False: 0]
Branch (719:16): [True: 0, False: 0]
Branch (719:16): [True: 0, False: 0]
Branch (719:16): [True: 0, False: 0]
Branch (719:16): [Folded - Ignored]
Branch (719:16): [True: 0, False: 1]
Branch (719:16): [True: 0, False: 17]
Branch (719:16): [True: 0, False: 503]
Branch (719:16): [True: 0, False: 90]
Branch (719:16): [True: 0, False: 0]
Branch (719:16): [True: 0, False: 4]
|
720 | 0 | let maybe_subscriber = self |
721 | 0 | .action_db |
722 | 0 | .get_by_operation_id(operation_id) |
723 | 0 | .await |
724 | 0 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?; |
725 | 0 | let Some(subscriber) = maybe_subscriber else { Branch (725:17): [True: 0, False: 0]
Branch (725:17): [True: 0, False: 0]
Branch (725:17): [True: 0, False: 0]
Branch (725:17): [True: 0, False: 0]
Branch (725:17): [Folded - Ignored]
Branch (725:17): [True: 0, False: 0]
Branch (725:17): [True: 0, False: 0]
Branch (725:17): [True: 0, False: 0]
Branch (725:17): [True: 0, False: 0]
Branch (725:17): [True: 0, False: 0]
Branch (725:17): [True: 0, False: 0]
|
726 | 0 | return Ok(Box::pin(stream::empty())); |
727 | | }; |
728 | 0 | let awaited_action = subscriber |
729 | 0 | .borrow() |
730 | 0 | .await |
731 | 0 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?; |
732 | 0 | if !self Branch (732:16): [True: 0, False: 0]
Branch (732:16): [True: 0, False: 0]
Branch (732:16): [True: 0, False: 0]
Branch (732:16): [True: 0, False: 0]
Branch (732:16): [Folded - Ignored]
Branch (732:16): [True: 0, False: 0]
Branch (732:16): [True: 0, False: 0]
Branch (732:16): [True: 0, False: 0]
Branch (732:16): [True: 0, False: 0]
Branch (732:16): [True: 0, False: 0]
Branch (732:16): [True: 0, False: 0]
|
733 | 0 | .apply_filter_predicate(&awaited_action, &subscriber, &filter) |
734 | 0 | .await |
735 | | { |
736 | 0 | return Ok(Box::pin(stream::empty())); |
737 | 0 | } |
738 | 0 | return Ok(Box::pin(stream::once(async move { |
739 | 0 | to_action_state_result(subscriber) |
740 | 0 | }))); |
741 | 615 | } |
742 | 615 | if let Some(client_operation_id4 ) = &filter.client_operation_id { Branch (742:16): [True: 0, False: 0]
Branch (742:16): [True: 0, False: 0]
Branch (742:16): [True: 0, False: 0]
Branch (742:16): [True: 0, False: 0]
Branch (742:16): [Folded - Ignored]
Branch (742:16): [True: 1, False: 0]
Branch (742:16): [True: 0, False: 17]
Branch (742:16): [True: 3, False: 500]
Branch (742:16): [True: 0, False: 90]
Branch (742:16): [True: 0, False: 0]
Branch (742:16): [True: 0, False: 4]
|
743 | 4 | let maybe_subscriber = self |
744 | 4 | .action_db |
745 | 4 | .get_awaited_action_by_id(client_operation_id) |
746 | 4 | .await |
747 | 4 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?0 ; |
748 | 4 | let Some(subscriber) = maybe_subscriber else { Branch (748:17): [True: 0, False: 0]
Branch (748:17): [True: 0, False: 0]
Branch (748:17): [True: 0, False: 0]
Branch (748:17): [True: 0, False: 0]
Branch (748:17): [Folded - Ignored]
Branch (748:17): [True: 1, False: 0]
Branch (748:17): [True: 0, False: 0]
Branch (748:17): [True: 3, False: 0]
Branch (748:17): [True: 0, False: 0]
Branch (748:17): [True: 0, False: 0]
Branch (748:17): [True: 0, False: 0]
|
749 | 0 | return Ok(Box::pin(stream::empty())); |
750 | | }; |
751 | 4 | let awaited_action = subscriber |
752 | 4 | .borrow() |
753 | 4 | .await |
754 | 4 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?0 ; |
755 | 4 | if !self Branch (755:16): [True: 0, False: 0]
Branch (755:16): [True: 0, False: 0]
Branch (755:16): [True: 0, False: 0]
Branch (755:16): [True: 0, False: 0]
Branch (755:16): [Folded - Ignored]
Branch (755:16): [True: 0, False: 1]
Branch (755:16): [True: 0, False: 0]
Branch (755:16): [True: 0, False: 3]
Branch (755:16): [True: 0, False: 0]
Branch (755:16): [True: 0, False: 0]
Branch (755:16): [True: 0, False: 0]
|
756 | 4 | .apply_filter_predicate(&awaited_action, &subscriber, &filter) |
757 | 4 | .await |
758 | | { |
759 | 0 | return Ok(Box::pin(stream::empty())); |
760 | 4 | } |
761 | 4 | return Ok(Box::pin(stream::once(async move { |
762 | 4 | to_action_state_result(subscriber) |
763 | 4 | }))); |
764 | 611 | } |
765 | | |
766 | 611 | let Some(sorted_awaited_action_state) = Branch (766:13): [True: 0, False: 0]
Branch (766:13): [True: 0, False: 0]
Branch (766:13): [True: 0, False: 0]
Branch (766:13): [True: 0, False: 0]
Branch (766:13): [Folded - Ignored]
Branch (766:13): [True: 0, False: 0]
Branch (766:13): [True: 17, False: 0]
Branch (766:13): [True: 500, False: 0]
Branch (766:13): [True: 90, False: 0]
Branch (766:13): [True: 0, False: 0]
Branch (766:13): [True: 4, False: 0]
|
767 | 611 | sorted_awaited_action_state_for_flags(filter.stages) |
768 | | else { |
769 | 0 | let mut all_items: Vec<_> = self |
770 | 0 | .action_db |
771 | 0 | .get_all_awaited_actions() |
772 | 0 | .await |
773 | 0 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")? |
774 | 0 | .and_then(|awaited_action_subscriber| async move { |
775 | 0 | let awaited_action = awaited_action_subscriber |
776 | 0 | .borrow() |
777 | 0 | .await |
778 | 0 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?; |
779 | 0 | Ok((awaited_action_subscriber, awaited_action)) |
780 | 0 | }) |
781 | 0 | .try_filter_map(|(subscriber, awaited_action)| { |
782 | 0 | let filter = filter.clone(); |
783 | 0 | async move { |
784 | 0 | Ok(self |
785 | 0 | .apply_filter_predicate(&awaited_action, &subscriber, &filter) |
786 | 0 | .await |
787 | 0 | .then_some((subscriber, awaited_action.sort_key()))) |
788 | 0 | } |
789 | 0 | }) |
790 | 0 | .try_collect() |
791 | 0 | .await |
792 | 0 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?; |
793 | 0 | match filter.order_by_priority_direction { |
794 | 0 | Some(OrderDirection::Asc) => all_items.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)), |
795 | 0 | Some(OrderDirection::Desc) => all_items.sort_unstable_by(|(_, a), (_, b)| b.cmp(a)), |
796 | 0 | None => {} |
797 | | } |
798 | 0 | return Ok(Box::pin(stream::iter( |
799 | 0 | all_items |
800 | 0 | .into_iter() |
801 | 0 | .map(move |(subscriber, _)| to_action_state_result(subscriber)), |
802 | | ))); |
803 | | }; |
804 | | |
805 | 611 | let desc = matches!500 ( |
806 | 111 | filter.order_by_priority_direction, |
807 | | Some(OrderDirection::Desc) |
808 | | ); |
809 | 611 | let stream = self |
810 | 611 | .action_db |
811 | 611 | .get_range_of_actions( |
812 | 611 | sorted_awaited_action_state, |
813 | 611 | Bound::Unbounded, |
814 | 611 | Bound::Unbounded, |
815 | 611 | desc, |
816 | 611 | ) |
817 | 611 | .await |
818 | 611 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?0 |
819 | 611 | .and_then(|awaited_action_subscriber| async move {550 |
820 | 550 | let awaited_action = awaited_action_subscriber |
821 | 550 | .borrow() |
822 | 550 | .await |
823 | 550 | .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?0 ; |
824 | 550 | Ok((awaited_action_subscriber, awaited_action)) |
825 | 1.10k | }) |
826 | 611 | .try_filter_map(move |(subscriber, awaited_action)| {550 |
827 | 550 | let filter = filter.clone(); |
828 | 550 | async move { |
829 | 550 | Ok(self |
830 | 550 | .apply_filter_predicate(&awaited_action, &subscriber, &filter) |
831 | 550 | .await |
832 | 550 | .then_some(subscriber)) |
833 | 550 | } |
834 | 550 | }) |
835 | 611 | .map(move |result| -> Box<dyn ActionStateResult> {550 |
836 | 550 | result.map_or_else( |
837 | 0 | |e| -> Box<dyn ActionStateResult> { Box::new(ErrorActionStateResult(e)) }, |
838 | 550 | |v| -> Box<dyn ActionStateResult> { to_action_state_result(v) }, |
839 | | ) |
840 | 550 | }); |
841 | 611 | Ok(Box::pin(stream)) |
842 | 615 | } |
843 | | } |
844 | | |
845 | | #[async_trait] |
846 | | impl<T, I, NowFn> ClientStateManager for SimpleSchedulerStateManager<T, I, NowFn> |
847 | | where |
848 | | T: AwaitedActionDb, |
849 | | I: InstantWrapper, |
850 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
851 | | { |
852 | | async fn add_action( |
853 | | &self, |
854 | | client_operation_id: OperationId, |
855 | | action_info: Arc<ActionInfo>, |
856 | 30 | ) -> Result<Box<dyn ActionStateResult>, Error> { |
857 | | let sub = self |
858 | | .inner_add_operation(client_operation_id, action_info.clone()) |
859 | | .await?; |
860 | | |
861 | | Ok(Box::new(ClientActionStateResult::new( |
862 | | sub, |
863 | | self.weak_self.clone(), |
864 | | self.no_event_action_timeout, |
865 | | self.now_fn.clone(), |
866 | | ))) |
867 | 30 | } |
868 | | |
869 | | async fn filter_operations<'a>( |
870 | | &'a self, |
871 | | filter: OperationFilter, |
872 | 504 | ) -> Result<ActionStateResultStream<'a>, Error> { |
873 | 504 | self.inner_filter_operations(filter, move |rx| { |
874 | 504 | Box::new(ClientActionStateResult::new( |
875 | 504 | rx, |
876 | 504 | self.weak_self.clone(), |
877 | 504 | self.no_event_action_timeout, |
878 | 504 | self.now_fn.clone(), |
879 | 504 | )) |
880 | 504 | }) |
881 | | .await |
882 | 504 | } |
883 | | |
884 | 0 | fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> { |
885 | 0 | None |
886 | 0 | } |
887 | | } |
888 | | |
889 | | #[async_trait] |
890 | | impl<T, I, NowFn> WorkerStateManager for SimpleSchedulerStateManager<T, I, NowFn> |
891 | | where |
892 | | T: AwaitedActionDb, |
893 | | I: InstantWrapper, |
894 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
895 | | { |
896 | | async fn update_operation( |
897 | | &self, |
898 | | operation_id: &OperationId, |
899 | | worker_id: &WorkerId, |
900 | | update: UpdateOperationType, |
901 | 16 | ) -> Result<(), Error> { |
902 | | self.inner_update_operation(operation_id, Some(worker_id), update) |
903 | | .await |
904 | 16 | } |
905 | | } |
906 | | |
907 | | #[async_trait] |
908 | | impl<T, I, NowFn> MatchingEngineStateManager for SimpleSchedulerStateManager<T, I, NowFn> |
909 | | where |
910 | | T: AwaitedActionDb, |
911 | | I: InstantWrapper, |
912 | | NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, |
913 | | { |
914 | | async fn filter_operations<'a>( |
915 | | &'a self, |
916 | | filter: OperationFilter, |
917 | 111 | ) -> Result<ActionStateResultStream<'a>, Error> { |
918 | 50 | self.inner_filter_operations(filter, |rx| { |
919 | 50 | Box::new(MatchingEngineActionStateResult::new( |
920 | 50 | rx, |
921 | 50 | self.weak_self.clone(), |
922 | 50 | self.no_event_action_timeout, |
923 | 50 | self.now_fn.clone(), |
924 | 50 | )) |
925 | 50 | }) |
926 | | .await |
927 | 111 | } |
928 | | |
929 | | async fn assign_operation( |
930 | | &self, |
931 | | operation_id: &OperationId, |
932 | | worker_id_or_reason_for_unassign: Result<&WorkerId, Error>, |
933 | 33 | ) -> Result<(), Error> { |
934 | | let (maybe_worker_id, update) = match worker_id_or_reason_for_unassign { |
935 | | Ok(worker_id) => ( |
936 | | Some(worker_id), |
937 | | UpdateOperationType::UpdateWithActionStage(ActionStage::Executing), |
938 | | ), |
939 | | Err(err) => (None, UpdateOperationType::UpdateWithError(err)), |
940 | | }; |
941 | | self.inner_update_operation(operation_id, maybe_worker_id, update) |
942 | | .await |
943 | 33 | } |
944 | | } |