Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-scheduler/src/simple_scheduler_state_manager.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::ops::Bound;
16
use std::string::ToString;
17
use std::sync::{Arc, Weak};
18
use std::time::{Duration, SystemTime};
19
20
use async_lock::Mutex;
21
use async_trait::async_trait;
22
use futures::{future, stream, FutureExt, StreamExt, TryStreamExt};
23
use nativelink_error::{make_err, Code, Error, ResultExt};
24
use nativelink_metric::MetricsComponent;
25
use nativelink_util::action_messages::{
26
    ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueQualifier, ExecutionMetadata,
27
    OperationId, WorkerId,
28
};
29
use nativelink_util::instant_wrapper::InstantWrapper;
30
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
31
use nativelink_util::operation_state_manager::{
32
    ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
33
    OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType, WorkerStateManager,
34
};
35
use tracing::{event, Level};
36
37
use super::awaited_action_db::{
38
    AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, SortedAwaitedActionState,
39
};
40
41
/// Maximum number of times an update to the database
42
/// can fail before giving up.
43
const MAX_UPDATE_RETRIES: usize = 5;
44
45
/// Simple struct that implements the ActionStateResult trait and always returns an error.
46
struct ErrorActionStateResult(Error);
47
48
#[async_trait]
49
impl ActionStateResult for ErrorActionStateResult {
50
0
    async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
51
0
        Err(self.0.clone())
52
0
    }
53
54
0
    async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
55
0
        Err(self.0.clone())
56
0
    }
57
58
0
    async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
59
0
        Err(self.0.clone())
60
0
    }
61
}
62
63
45
fn apply_filter_predicate(awaited_action: &AwaitedAction, filter: &OperationFilter) -> bool {
64
    // Note: The caller must filter `client_operation_id`.
65
66
45
    if let Some(
operation_id0
) = &filter.operation_id {
  Branch (66:12): [True: 0, False: 45]
  Branch (66:12): [Folded - Ignored]
67
0
        if operation_id != awaited_action.operation_id() {
  Branch (67:12): [True: 0, False: 0]
  Branch (67:12): [Folded - Ignored]
68
0
            return false;
69
0
        }
70
45
    }
71
72
45
    if filter.worker_id.is_some() && 
filter.worker_id != awaited_action.worker_id()0
{
  Branch (72:8): [True: 0, False: 45]
  Branch (72:38): [True: 0, False: 0]
  Branch (72:8): [Folded - Ignored]
  Branch (72:38): [Folded - Ignored]
73
0
        return false;
74
45
    }
75
76
    {
77
45
        if let Some(
filter_unique_key0
) = &filter.unique_key {
  Branch (77:16): [True: 0, False: 45]
  Branch (77:16): [Folded - Ignored]
78
0
            match &awaited_action.action_info().unique_qualifier {
79
0
                ActionUniqueQualifier::Cachable(unique_key) => {
80
0
                    if filter_unique_key != unique_key {
  Branch (80:24): [True: 0, False: 0]
  Branch (80:24): [Folded - Ignored]
81
0
                        return false;
82
0
                    }
83
                }
84
                ActionUniqueQualifier::Uncachable(_) => {
85
0
                    return false;
86
                }
87
            }
88
45
        }
89
45
        if let Some(
action_digest0
) = filter.action_digest {
  Branch (89:16): [True: 0, False: 45]
  Branch (89:16): [Folded - Ignored]
90
0
            if action_digest != awaited_action.action_info().digest() {
  Branch (90:16): [True: 0, False: 0]
  Branch (90:16): [Folded - Ignored]
91
0
                return false;
92
0
            }
93
45
        }
94
    }
95
96
    {
97
45
        let last_worker_update_timestamp = awaited_action.last_worker_updated_timestamp();
98
45
        if let Some(
worker_update_before0
) = filter.worker_update_before {
  Branch (98:16): [True: 0, False: 45]
  Branch (98:16): [Folded - Ignored]
99
0
            if worker_update_before < last_worker_update_timestamp {
  Branch (99:16): [True: 0, False: 0]
  Branch (99:16): [Folded - Ignored]
100
0
                return false;
101
0
            }
102
45
        }
103
45
        if let Some(
completed_before0
) = filter.completed_before {
  Branch (103:16): [True: 0, False: 45]
  Branch (103:16): [Folded - Ignored]
104
0
            if awaited_action.state().stage.is_finished()
  Branch (104:16): [True: 0, False: 0]
  Branch (104:16): [Folded - Ignored]
105
0
                && completed_before < last_worker_update_timestamp
  Branch (105:20): [True: 0, False: 0]
  Branch (105:20): [Folded - Ignored]
106
            {
107
0
                return false;
108
0
            }
109
45
        }
110
45
        if filter.stages != OperationStageFlags::Any {
  Branch (110:12): [True: 42, False: 3]
  Branch (110:12): [Folded - Ignored]
111
42
            let stage_flag = match awaited_action.state().stage {
112
0
                ActionStage::Unknown => OperationStageFlags::Any,
113
0
                ActionStage::CacheCheck => OperationStageFlags::CacheCheck,
114
42
                ActionStage::Queued => OperationStageFlags::Queued,
115
0
                ActionStage::Executing => OperationStageFlags::Executing,
116
0
                ActionStage::Completed(_) => OperationStageFlags::Completed,
117
0
                ActionStage::CompletedFromCache(_) => OperationStageFlags::Completed,
118
            };
119
42
            if !filter.stages.intersects(stage_flag) {
  Branch (119:16): [True: 0, False: 42]
  Branch (119:16): [Folded - Ignored]
120
0
                return false;
121
42
            }
122
3
        }
123
    }
124
125
45
    true
126
45
}
127
128
struct ClientActionStateResult<U, T, I, NowFn>
129
where
130
    U: AwaitedActionSubscriber,
131
    T: AwaitedActionDb,
132
    I: InstantWrapper,
133
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
134
{
135
    inner: MatchingEngineActionStateResult<U, T, I, NowFn>,
136
}
137
138
impl<U, T, I, NowFn> ClientActionStateResult<U, T, I, NowFn>
139
where
140
    U: AwaitedActionSubscriber,
141
    T: AwaitedActionDb,
142
    I: InstantWrapper,
143
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
144
{
145
28
    fn new(
146
28
        sub: U,
147
28
        simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
148
28
        no_event_action_timeout: Duration,
149
28
        now_fn: NowFn,
150
28
    ) -> Self {
151
28
        Self {
152
28
            inner: MatchingEngineActionStateResult::new(
153
28
                sub,
154
28
                simple_scheduler_state_manager,
155
28
                no_event_action_timeout,
156
28
                now_fn,
157
28
            ),
158
28
        }
159
28
    }
160
}
161
162
#[async_trait]
163
impl<U, T, I, NowFn> ActionStateResult for ClientActionStateResult<U, T, I, NowFn>
164
where
165
    U: AwaitedActionSubscriber,
166
    T: AwaitedActionDb,
167
    I: InstantWrapper,
168
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
169
{
170
3
    async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
171
3
        self.inner.as_state().
await0
172
6
    }
173
174
44
    async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
175
1.49k
        
self.inner.changed()44
.await
176
86
    }
177
178
0
    async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
179
0
        self.inner.as_action_info().await
180
0
    }
181
}
182
183
struct MatchingEngineActionStateResult<U, T, I, NowFn>
184
where
185
    U: AwaitedActionSubscriber,
186
    T: AwaitedActionDb,
187
    I: InstantWrapper,
188
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
189
{
190
    awaited_action_sub: U,
191
    simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
192
    no_event_action_timeout: Duration,
193
    now_fn: NowFn,
194
}
195
impl<U, T, I, NowFn> MatchingEngineActionStateResult<U, T, I, NowFn>
196
where
197
    U: AwaitedActionSubscriber,
198
    T: AwaitedActionDb,
199
    I: InstantWrapper,
200
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
201
{
202
70
    fn new(
203
70
        awaited_action_sub: U,
204
70
        simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
205
70
        no_event_action_timeout: Duration,
206
70
        now_fn: NowFn,
207
70
    ) -> Self {
208
70
        Self {
209
70
            awaited_action_sub,
210
70
            simple_scheduler_state_manager,
211
70
            no_event_action_timeout,
212
70
            now_fn,
213
70
        }
214
70
    }
215
}
216
217
#[async_trait]
218
impl<U, T, I, NowFn> ActionStateResult for MatchingEngineActionStateResult<U, T, I, NowFn>
219
where
220
    U: AwaitedActionSubscriber,
221
    T: AwaitedActionDb,
222
    I: InstantWrapper,
223
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
224
{
225
31
    async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
226
31
        Ok(self
227
31
            .awaited_action_sub
228
31
            .borrow()
229
0
            .await
230
31
            .err_tip(|| 
"In MatchingEngineActionStateResult::as_state"0
)
?0
231
31
            .state()
232
31
            .clone())
233
62
    }
234
235
44
    async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
236
44
        let mut timeout_attempts = 0;
237
        loop {
238
54
            tokio::select! {
239
54
                
awaited_action_result42
= self.awaited_action_sub.changed() => {
240
42
                    return awaited_action_result
241
42
                        .err_tip(|| 
"In MatchingEngineActionStateResult::changed"0
)
242
42
                        .map(|v| v.state().clone());
243
                }
244
54
                _ = (self.now_fn)().sleep(self.no_event_action_timeout) => {
245
10
                    // Timeout happened, do additional checks below.
246
10
                }
247
            }
248
249
10
            let awaited_action = self
250
10
                .awaited_action_sub
251
10
                .borrow()
252
0
                .await
253
10
                .err_tip(|| 
"In MatchingEngineActionStateResult::changed"0
)
?0
;
254
255
10
            if 
matches!1
(awaited_action.state().stage, ActionStage::Queued) {
256
                // Actions in queued state do not get periodically updated,
257
                // so we don't need to timeout them.
258
9
                continue;
259
1
            }
260
261
1
            let simple_scheduler_state_manager = self
262
1
                .simple_scheduler_state_manager
263
1
                .upgrade()
264
1
                .err_tip(|| 
format!("Failed to upgrade weak reference to SimpleSchedulerStateManager in MatchingEngineActionStateResult::changed at attempt: {timeout_attempts}")0
)
?0
;
265
266
1
            event!(
267
1
                Level::WARN,
268
                ?awaited_action,
269
1
                "OperationId {} / {} timed out after {} seconds issuing a retry",
270
1
                awaited_action.operation_id(),
271
1
                awaited_action.state().client_operation_id,
272
1
                self.no_event_action_timeout.as_secs_f32(),
273
            );
274
275
1
            simple_scheduler_state_manager
276
1
                .timeout_operation_id(awaited_action.operation_id())
277
0
                .await
278
1
                .err_tip(|| 
"In MatchingEngineActionStateResult::changed"0
)
?0
;
279
280
1
            if timeout_attempts >= MAX_UPDATE_RETRIES {
  Branch (280:16): [True: 0, False: 0]
  Branch (280:16): [True: 0, False: 0]
  Branch (280:16): [Folded - Ignored]
  Branch (280:16): [True: 0, False: 1]
  Branch (280:16): [True: 0, False: 0]
281
0
                return Err(make_err!(
282
0
                    Code::Internal,
283
0
                    "Failed to update action after {} retries with no error set in MatchingEngineActionStateResult::changed - {} {:?}",
284
0
                    MAX_UPDATE_RETRIES,
285
0
                    awaited_action.operation_id(),
286
0
                    awaited_action.state().stage,
287
0
                ));
288
1
            }
289
1
            timeout_attempts += 1;
290
        }
291
86
    }
292
293
42
    async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
294
42
        Ok(self
295
42
            .awaited_action_sub
296
42
            .borrow()
297
0
            .await
298
42
            .err_tip(|| 
"In MatchingEngineActionStateResult::as_action_info"0
)
?0
299
42
            .action_info()
300
42
            .clone())
301
84
    }
302
}
303
304
/// SimpleSchedulerStateManager is responsible for maintaining the state of the scheduler.
305
/// Scheduler state includes the actions that are queued, active, and recently completed.
306
/// It also includes the workers that are available to execute actions based on allocation
307
/// strategy.
308
0
#[derive(MetricsComponent)]
309
pub struct SimpleSchedulerStateManager<T, I, NowFn>
310
where
311
    T: AwaitedActionDb,
312
    I: InstantWrapper,
313
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
314
{
315
    /// Database for storing the state of all actions.
316
    #[metric(group = "action_db")]
317
    action_db: T,
318
319
    /// Maximum number of times a job can be retried.
320
    // TODO(allada) This should be a scheduler decorator instead
321
    // of always having it on every SimpleScheduler.
322
    #[metric(help = "Maximum number of times a job can be retried")]
323
    max_job_retries: usize,
324
325
    /// Duration after which an action is considered to be timed out if
326
    /// no event is received.
327
    #[metric(
328
        help = "Duration after which an action is considered to be timed out if no event is received"
329
    )]
330
    no_event_action_timeout: Duration,
331
332
    // A lock to ensure only one timeout operation is running at a time
333
    // on this service.
334
    timeout_operation_mux: Mutex<()>,
335
336
    /// Weak reference to self.
337
    // We use a weak reference to reduce the risk of a memory leak from
338
    // future changes. If this becomes some kind of perforamnce issue,
339
    // we can consider using a strong reference.
340
    weak_self: Weak<Self>,
341
342
    /// Function to get the current time.
343
    now_fn: NowFn,
344
}
345
346
impl<T, I, NowFn> SimpleSchedulerStateManager<T, I, NowFn>
347
where
348
    T: AwaitedActionDb,
349
    I: InstantWrapper,
350
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
351
{
352
21
    pub fn new(
353
21
        max_job_retries: usize,
354
21
        no_event_action_timeout: Duration,
355
21
        action_db: T,
356
21
        now_fn: NowFn,
357
21
    ) -> Arc<Self> {
358
21
        Arc::new_cyclic(|weak_self| Self {
359
21
            action_db,
360
21
            max_job_retries,
361
21
            no_event_action_timeout,
362
21
            timeout_operation_mux: Mutex::new(()),
363
21
            weak_self: weak_self.clone(),
364
21
            now_fn,
365
21
        })
366
21
    }
367
368
    /// Let the scheduler know that an operation has timed out from
369
    /// the client side (ie: worker has not updated in a while).
370
1
    async fn timeout_operation_id(&self, operation_id: &OperationId) -> Result<(), Error> {
371
        // Ensure that only one timeout operation is running at a time.
372
        // Failing to do this could result in the same operation being
373
        // timed out multiple times at the same time.
374
        // Note: We could implement this on a per-operation_id basis, but it is quite
375
        // complex to manage the locks.
376
1
        let _lock = self.timeout_operation_mux.lock().
await0
;
377
378
1
        let awaited_action_subscriber = self
379
1
            .action_db
380
1
            .get_by_operation_id(operation_id)
381
0
            .await
382
1
            .err_tip(|| 
"In SimpleSchedulerStateManager::timeout_operation_id"0
)
?0
383
1
            .err_tip(|| {
384
0
                format!("Operation id {operation_id} does not exist in SimpleSchedulerStateManager::timeout_operation_id")
385
1
            })
?0
;
386
387
1
        let awaited_action = awaited_action_subscriber
388
1
            .borrow()
389
0
            .await
390
1
            .err_tip(|| 
"In SimpleSchedulerStateManager::timeout_operation_id"0
)
?0
;
391
392
        // If the action is not executing, we should not timeout the action.
393
1
        if !
matches!0
(awaited_action.state().stage, ActionStage::Executing) {
  Branch (393:12): [True: 0, False: 0]
  Branch (393:12): [True: 0, False: 0]
  Branch (393:12): [Folded - Ignored]
  Branch (393:12): [True: 0, False: 1]
  Branch (393:12): [True: 0, False: 0]
394
0
            return Ok(());
395
1
        }
396
397
1
        let last_worker_updated = awaited_action
398
1
            .last_worker_updated_timestamp()
399
1
            .duration_since(SystemTime::UNIX_EPOCH)
400
1
            .map_err(|e| {
401
0
                make_err!(
402
0
                    Code::Internal,
403
0
                    "Failed to convert last_worker_updated to duration since epoch {e:?}"
404
0
                )
405
1
            })
?0
;
406
1
        let worker_should_update_before = last_worker_updated
407
1
            .checked_add(self.no_event_action_timeout)
408
1
            .err_tip(|| 
"Timestamp too big in SimpleSchedulerStateManager::timeout_operation_id"0
)
?0
;
409
1
        if worker_should_update_before < (self.now_fn)().elapsed() {
  Branch (409:12): [True: 0, False: 0]
  Branch (409:12): [True: 0, False: 0]
  Branch (409:12): [Folded - Ignored]
  Branch (409:12): [True: 0, False: 1]
  Branch (409:12): [True: 0, False: 0]
410
            // The action was updated recently, we should not timeout the action.
411
            // This is to prevent timing out actions that have recently been updated
412
            // (like multiple clients timeout the same action at the same time).
413
0
            return Ok(());
414
1
        }
415
1
416
1
        self.assign_operation(
417
1
            operation_id,
418
1
            Err(make_err!(
419
1
                Code::DeadlineExceeded,
420
1
                "Operation timed out after {} seconds",
421
1
                self.no_event_action_timeout.as_secs_f32(),
422
1
            )),
423
1
        )
424
0
        .await
425
1
    }
426
427
41
    async fn inner_update_operation(
428
41
        &self,
429
41
        operation_id: &OperationId,
430
41
        maybe_worker_id: Option<&WorkerId>,
431
41
        update: UpdateOperationType,
432
41
    ) -> Result<(), Error> {
433
41
        let mut last_err = None;
434
42
        for _ in 0..MAX_UPDATE_RETRIES {
435
42
            let maybe_awaited_action_subscriber = self
436
42
                .action_db
437
42
                .get_by_operation_id(operation_id)
438
0
                .await
439
42
                .err_tip(|| 
"In SimpleSchedulerStateManager::update_operation"0
)
?0
;
440
42
            let Some(
awaited_action_subscriber41
) = maybe_awaited_action_subscriber else {
  Branch (440:17): [True: 0, False: 0]
  Branch (440:17): [True: 0, False: 0]
  Branch (440:17): [Folded - Ignored]
  Branch (440:17): [True: 39, False: 0]
  Branch (440:17): [True: 2, False: 1]
441
                // No action found. It is ok if the action was not found. It
442
                // probably means that the action was dropped, but worker was
443
                // still processing it.
444
1
                return Ok(());
445
            };
446
447
41
            let mut awaited_action = awaited_action_subscriber
448
41
                .borrow()
449
0
                .await
450
41
                .err_tip(|| 
"In SimpleSchedulerStateManager::update_operation"0
)
?0
;
451
452
            // Make sure the worker id matches the awaited action worker id.
453
            // This might happen if the worker sending the update is not the
454
            // worker that was assigned.
455
41
            if awaited_action.worker_id().is_some()
  Branch (455:16): [True: 0, False: 0]
  Branch (455:16): [True: 0, False: 0]
  Branch (455:16): [Folded - Ignored]
  Branch (455:16): [True: 13, False: 26]
  Branch (455:16): [True: 0, False: 2]
456
13
                && maybe_worker_id.is_some()
  Branch (456:20): [True: 0, False: 0]
  Branch (456:20): [True: 0, False: 0]
  Branch (456:20): [Folded - Ignored]
  Branch (456:20): [True: 12, False: 1]
  Branch (456:20): [True: 0, False: 0]
457
12
                && maybe_worker_id != awaited_action.worker_id().as_ref()
  Branch (457:20): [True: 0, False: 0]
  Branch (457:20): [True: 0, False: 0]
  Branch (457:20): [Folded - Ignored]
  Branch (457:20): [True: 0, False: 12]
  Branch (457:20): [True: 0, False: 0]
458
            {
459
                // If another worker is already assigned to the action, another
460
                // worker probably picked up the action. We should not update the
461
                // action in this case and abort this operation.
462
0
                let err = make_err!(
463
0
                    Code::Aborted,
464
0
                    "Worker ids do not match - {:?} != {:?} for {:?}",
465
0
                    maybe_worker_id,
466
0
                    awaited_action.worker_id(),
467
0
                    awaited_action,
468
0
                );
469
0
                event!(
470
0
                    Level::INFO,
471
0
                    "Worker ids do not match - {:?} != {:?} for {:?}. This is probably due to another worker picking up the action.",
472
0
                    maybe_worker_id,
473
0
                    awaited_action.worker_id(),
474
                    awaited_action,
475
                );
476
0
                return Err(err);
477
41
            }
478
41
479
41
            // Make sure we don't update an action that is already completed.
480
41
            if awaited_action.state().stage.is_finished() {
  Branch (480:16): [True: 0, False: 0]
  Branch (480:16): [True: 0, False: 0]
  Branch (480:16): [Folded - Ignored]
  Branch (480:16): [True: 0, False: 39]
  Branch (480:16): [True: 0, False: 2]
481
0
                return Err(make_err!(
482
0
                    Code::Internal,
483
0
                    "Action {operation_id:?} is already completed with state {:?} - maybe_worker_id: {:?}",
484
0
                    awaited_action.state().stage,
485
0
                    maybe_worker_id,
486
0
                ));
487
41
            }
488
489
41
            let stage = match &update {
490
                UpdateOperationType::KeepAlive => {
491
0
                    awaited_action.keep_alive((self.now_fn)().now());
492
0
                    return self
493
0
                        .action_db
494
0
                        .update_awaited_action(awaited_action)
495
0
                        .await
496
0
                        .err_tip(|| "Failed to send KeepAlive in SimpleSchedulerStateManager::update_operation");
497
                }
498
33
                UpdateOperationType::UpdateWithActionStage(stage) => stage.clone(),
499
8
                UpdateOperationType::UpdateWithError(err) => {
500
8
                    // Don't count a backpressure failure as an attempt for an action.
501
8
                    let due_to_backpressure = err.code == Code::ResourceExhausted;
502
8
                    if !due_to_backpressure {
  Branch (502:24): [True: 0, False: 0]
  Branch (502:24): [True: 0, False: 0]
  Branch (502:24): [Folded - Ignored]
  Branch (502:24): [True: 8, False: 0]
  Branch (502:24): [True: 0, False: 0]
503
8
                        awaited_action.attempts += 1;
504
8
                    }
0
505
506
8
                    if awaited_action.attempts > self.max_job_retries {
  Branch (506:24): [True: 0, False: 0]
  Branch (506:24): [True: 0, False: 0]
  Branch (506:24): [Folded - Ignored]
  Branch (506:24): [True: 1, False: 7]
  Branch (506:24): [True: 0, False: 0]
507
1
                        ActionStage::Completed(ActionResult {
508
1
                            execution_metadata: ExecutionMetadata {
509
1
                                worker: maybe_worker_id.map_or_else(String::default, ToString::to_string),
510
1
                                ..ExecutionMetadata::default()
511
1
                            },
512
1
                            error: Some(err.clone().merge(make_err!(
513
1
                                Code::Internal,
514
1
                                "Job cancelled because it attempted to execute too many times {} > {} times {}",
515
1
                                awaited_action.attempts,
516
1
                                self.max_job_retries,
517
1
                                format!("for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}"),
518
1
                            ))),
519
1
                            ..ActionResult::default()
520
1
                        })
521
                    } else {
522
7
                        ActionStage::Queued
523
                    }
524
                }
525
            };
526
41
            let now = (self.now_fn)().now();
527
41
            if 
matches!34
(stage, ActionStage::Queued) {
528
7
                // If the action is queued, we need to unset the worker id regardless of
529
7
                // which worker sent the update.
530
7
                awaited_action.set_worker_id(None, now);
531
34
            } else {
532
34
                awaited_action.set_worker_id(maybe_worker_id.copied(), now);
533
34
            }
534
41
            awaited_action.set_state(
535
41
                Arc::new(ActionState {
536
41
                    stage,
537
41
                    // Client id is not known here, it is the responsibility of
538
41
                    // the the subscriber impl to replace this with the
539
41
                    // correct client id.
540
41
                    client_operation_id: operation_id.clone(),
541
41
                    action_digest: awaited_action.action_info().digest(),
542
41
                }),
543
41
                Some(now),
544
41
            );
545
546
41
            let update_action_result = self
547
41
                .action_db
548
41
                .update_awaited_action(awaited_action)
549
0
                .await
550
41
                .err_tip(|| 
"In SimpleSchedulerStateManager::update_operation"2
);
551
41
            if let Err(
err2
) = update_action_result {
  Branch (551:20): [True: 0, False: 0]
  Branch (551:20): [True: 0, False: 0]
  Branch (551:20): [Folded - Ignored]
  Branch (551:20): [True: 0, False: 39]
  Branch (551:20): [True: 2, False: 0]
552
                // We use Aborted to signal that the action was not
553
                // updated due to the data being set was not the latest
554
                // but can be retried.
555
2
                if err.code == Code::Aborted {
  Branch (555:20): [True: 0, False: 0]
  Branch (555:20): [True: 0, False: 0]
  Branch (555:20): [Folded - Ignored]
  Branch (555:20): [True: 0, False: 0]
  Branch (555:20): [True: 1, False: 1]
556
1
                    last_err = Some(err);
557
1
                    continue;
558
                } else {
559
1
                    return Err(err);
560
                }
561
39
            }
562
39
            return Ok(());
563
        }
564
0
        match last_err {
565
0
            Some(err) => Err(err),
566
0
            None => Err(make_err!(
567
0
                Code::Internal,
568
0
                "Failed to update action after {} retries with no error set",
569
0
                MAX_UPDATE_RETRIES,
570
0
            )),
571
        }
572
41
    }
573
574
25
    async fn inner_add_operation(
575
25
        &self,
576
25
        new_client_operation_id: OperationId,
577
25
        action_info: Arc<ActionInfo>,
578
25
    ) -> Result<T::Subscriber, Error> {
579
25
        self.action_db
580
25
            .add_action(new_client_operation_id, action_info)
581
0
            .await
582
25
            .err_tip(|| 
"In SimpleSchedulerStateManager::add_operation"0
)
583
25
    }
584
585
594
    async fn inner_filter_operations<'a, F>(
586
594
        &'a self,
587
594
        filter: OperationFilter,
588
594
        to_action_state_result: F,
589
594
    ) -> Result<ActionStateResultStream<'a>, Error>
590
594
    where
591
594
        F: Fn(T::Subscriber) -> Box<dyn ActionStateResult> + Send + Sync + 'a,
592
594
    {
593
91
        fn sorted_awaited_action_state_for_flags(
594
91
            stage: OperationStageFlags,
595
91
        ) -> Option<SortedAwaitedActionState> {
596
91
            match stage {
597
0
                OperationStageFlags::CacheCheck => Some(SortedAwaitedActionState::CacheCheck),
598
91
                OperationStageFlags::Queued => Some(SortedAwaitedActionState::Queued),
599
0
                OperationStageFlags::Executing => Some(SortedAwaitedActionState::Executing),
600
0
                OperationStageFlags::Completed => Some(SortedAwaitedActionState::Completed),
601
0
                _ => None,
602
            }
603
91
        }
604
605
594
        if let Some(
operation_id0
) = &filter.operation_id {
  Branch (605:16): [True: 0, False: 0]
  Branch (605:16): [True: 0, False: 0]
  Branch (605:16): [True: 0, False: 0]
  Branch (605:16): [True: 0, False: 0]
  Branch (605:16): [Folded - Ignored]
  Branch (605:16): [True: 0, False: 503]
  Branch (605:16): [True: 0, False: 87]
  Branch (605:16): [True: 0, False: 0]
  Branch (605:16): [True: 0, False: 4]
606
0
            let maybe_subscriber = self
607
0
                .action_db
608
0
                .get_by_operation_id(operation_id)
609
0
                .await
610
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
611
0
            let Some(subscriber) = maybe_subscriber else {
  Branch (611:17): [True: 0, False: 0]
  Branch (611:17): [True: 0, False: 0]
  Branch (611:17): [True: 0, False: 0]
  Branch (611:17): [True: 0, False: 0]
  Branch (611:17): [Folded - Ignored]
  Branch (611:17): [True: 0, False: 0]
  Branch (611:17): [True: 0, False: 0]
  Branch (611:17): [True: 0, False: 0]
  Branch (611:17): [True: 0, False: 0]
612
0
                return Ok(Box::pin(stream::empty()));
613
            };
614
0
            let awaited_action = subscriber
615
0
                .borrow()
616
0
                .await
617
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
618
0
            if !apply_filter_predicate(&awaited_action, &filter) {
  Branch (618:16): [True: 0, False: 0]
  Branch (618:16): [True: 0, False: 0]
  Branch (618:16): [True: 0, False: 0]
  Branch (618:16): [True: 0, False: 0]
  Branch (618:16): [Folded - Ignored]
  Branch (618:16): [True: 0, False: 0]
  Branch (618:16): [True: 0, False: 0]
  Branch (618:16): [True: 0, False: 0]
  Branch (618:16): [True: 0, False: 0]
619
0
                return Ok(Box::pin(stream::empty()));
620
0
            }
621
0
            return Ok(Box::pin(stream::once(async move {
622
0
                to_action_state_result(subscriber)
623
0
            })));
624
594
        }
625
594
        if let Some(
client_operation_id503
) = &filter.client_operation_id {
  Branch (625:16): [True: 0, False: 0]
  Branch (625:16): [True: 0, False: 0]
  Branch (625:16): [True: 0, False: 0]
  Branch (625:16): [True: 0, False: 0]
  Branch (625:16): [Folded - Ignored]
  Branch (625:16): [True: 503, False: 0]
  Branch (625:16): [True: 0, False: 87]
  Branch (625:16): [True: 0, False: 0]
  Branch (625:16): [True: 0, False: 4]
626
503
            let maybe_subscriber = self
627
503
                .action_db
628
503
                .get_awaited_action_by_id(client_operation_id)
629
0
                .await
630
503
                .err_tip(|| 
"In SimpleSchedulerStateManager::filter_operations"0
)
?0
;
631
503
            let Some(
subscriber3
) = maybe_subscriber else {
  Branch (631:17): [True: 0, False: 0]
  Branch (631:17): [True: 0, False: 0]
  Branch (631:17): [True: 0, False: 0]
  Branch (631:17): [True: 0, False: 0]
  Branch (631:17): [Folded - Ignored]
  Branch (631:17): [True: 3, False: 500]
  Branch (631:17): [True: 0, False: 0]
  Branch (631:17): [True: 0, False: 0]
  Branch (631:17): [True: 0, False: 0]
632
500
                return Ok(Box::pin(stream::empty()));
633
            };
634
3
            let awaited_action = subscriber
635
3
                .borrow()
636
0
                .await
637
3
                .err_tip(|| 
"In SimpleSchedulerStateManager::filter_operations"0
)
?0
;
638
3
            if !apply_filter_predicate(&awaited_action, &filter) {
  Branch (638:16): [True: 0, False: 0]
  Branch (638:16): [True: 0, False: 0]
  Branch (638:16): [True: 0, False: 0]
  Branch (638:16): [True: 0, False: 0]
  Branch (638:16): [Folded - Ignored]
  Branch (638:16): [True: 0, False: 3]
  Branch (638:16): [True: 0, False: 0]
  Branch (638:16): [True: 0, False: 0]
  Branch (638:16): [True: 0, False: 0]
639
0
                return Ok(Box::pin(stream::empty()));
640
3
            }
641
3
            return Ok(Box::pin(stream::once(async move {
642
3
                to_action_state_result(subscriber)
643
3
            })));
644
91
        }
645
646
91
        let Some(sorted_awaited_action_state) =
  Branch (646:13): [True: 0, False: 0]
  Branch (646:13): [True: 0, False: 0]
  Branch (646:13): [True: 0, False: 0]
  Branch (646:13): [True: 0, False: 0]
  Branch (646:13): [Folded - Ignored]
  Branch (646:13): [True: 0, False: 0]
  Branch (646:13): [True: 87, False: 0]
  Branch (646:13): [True: 0, False: 0]
  Branch (646:13): [True: 4, False: 0]
647
91
            sorted_awaited_action_state_for_flags(filter.stages)
648
        else {
649
0
            let mut all_items: Vec<_> = self
650
0
                .action_db
651
0
                .get_all_awaited_actions()
652
0
                .await
653
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?
654
0
                .and_then(|awaited_action_subscriber| async move {
655
0
                    let awaited_action = awaited_action_subscriber
656
0
                        .borrow()
657
0
                        .await
658
0
                        .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
659
0
                    Ok((awaited_action_subscriber, awaited_action))
660
0
                })
661
0
                .try_filter_map(|(subscriber, awaited_action)| {
662
0
                    if apply_filter_predicate(&awaited_action, &filter) {
  Branch (662:24): [True: 0, False: 0]
  Branch (662:24): [True: 0, False: 0]
  Branch (662:24): [True: 0, False: 0]
  Branch (662:24): [True: 0, False: 0]
  Branch (662:24): [Folded - Ignored]
  Branch (662:24): [True: 0, False: 0]
  Branch (662:24): [True: 0, False: 0]
  Branch (662:24): [True: 0, False: 0]
  Branch (662:24): [True: 0, False: 0]
663
0
                        future::ready(Ok(Some((subscriber, awaited_action.sort_key()))))
664
0
                            .left_future()
665
                    } else {
666
0
                        future::ready(Result::<_, Error>::Ok(None)).right_future()
667
                    }
668
0
                })
669
0
                .try_collect()
670
0
                .await
671
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
672
0
            match filter.order_by_priority_direction {
673
0
                Some(OrderDirection::Asc) => all_items.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)),
674
0
                Some(OrderDirection::Desc) => all_items.sort_unstable_by(|(_, a), (_, b)| b.cmp(a)),
675
0
                None => {}
676
            }
677
0
            return Ok(Box::pin(stream::iter(
678
0
                all_items
679
0
                    .into_iter()
680
0
                    .map(move |(subscriber, _)| to_action_state_result(subscriber)),
681
0
            )));
682
        };
683
684
91
        let desc = 
matches!0
(
685
91
            filter.order_by_priority_direction,
686
            Some(OrderDirection::Desc)
687
        );
688
91
        let stream = self
689
91
            .action_db
690
91
            .get_range_of_actions(
691
91
                sorted_awaited_action_state,
692
91
                Bound::Unbounded,
693
91
                Bound::Unbounded,
694
91
                desc,
695
91
            )
696
0
            .await
697
91
            .err_tip(|| 
"In SimpleSchedulerStateManager::filter_operations"0
)
?0
698
91
            .and_then(|awaited_action_subscriber| async move 
{42
699
42
                let awaited_action = awaited_action_subscriber
700
42
                    .borrow()
701
0
                    .await
702
42
                    .err_tip(|| 
"In SimpleSchedulerStateManager::filter_operations"0
)
?0
;
703
42
                Ok((awaited_action_subscriber, awaited_action))
704
91
            
}84
)
705
91
            .try_filter_map(move |(subscriber, awaited_action)| {
706
42
                if apply_filter_predicate(&awaited_action, &filter) {
  Branch (706:20): [True: 0, False: 0]
  Branch (706:20): [True: 0, False: 0]
  Branch (706:20): [True: 0, False: 0]
  Branch (706:20): [True: 0, False: 0]
  Branch (706:20): [Folded - Ignored]
  Branch (706:20): [True: 0, False: 0]
  Branch (706:20): [True: 40, False: 0]
  Branch (706:20): [True: 0, False: 0]
  Branch (706:20): [True: 2, False: 0]
707
42
                    future::ready(Ok(Some(subscriber))).left_future()
708
                } else {
709
0
                    future::ready(Result::<_, Error>::Ok(None)).right_future()
710
                }
711
91
            
}42
)
712
91
            .map(move |result| -> Box<dyn ActionStateResult> {
713
42
                result.map_or_else(
714
42
                    |e| -> Box<dyn ActionStateResult> 
{ Box::new(ErrorActionStateResult(e)) }0
,
715
42
                    |v| -> Box<dyn ActionStateResult> { to_action_state_result(v) },
716
42
                )
717
91
            });
718
91
        Ok(Box::pin(stream))
719
594
    }
720
}
721
722
#[async_trait]
723
impl<T, I, NowFn> ClientStateManager for SimpleSchedulerStateManager<T, I, NowFn>
724
where
725
    T: AwaitedActionDb,
726
    I: InstantWrapper,
727
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
728
{
729
    async fn add_action(
730
        &self,
731
        client_operation_id: OperationId,
732
        action_info: Arc<ActionInfo>,
733
25
    ) -> Result<Box<dyn ActionStateResult>, Error> {
734
25
        let sub = self
735
25
            .inner_add_operation(client_operation_id.clone(), action_info.clone())
736
0
            .await?;
737
738
25
        Ok(Box::new(ClientActionStateResult::new(
739
25
            sub,
740
25
            self.weak_self.clone(),
741
25
            self.no_event_action_timeout,
742
25
            self.now_fn.clone(),
743
25
        )))
744
50
    }
745
746
    async fn filter_operations<'a>(
747
        &'a self,
748
        filter: OperationFilter,
749
503
    ) -> Result<ActionStateResultStream<'a>, Error> {
750
503
        self.inner_filter_operations(filter, move |rx| {
751
3
            Box::new(ClientActionStateResult::new(
752
3
                rx,
753
3
                self.weak_self.clone(),
754
3
                self.no_event_action_timeout,
755
3
                self.now_fn.clone(),
756
3
            ))
757
503
        })
758
0
        .await
759
1.00k
    }
760
761
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
762
0
        None
763
0
    }
764
}
765
766
#[async_trait]
767
impl<T, I, NowFn> WorkerStateManager for SimpleSchedulerStateManager<T, I, NowFn>
768
where
769
    T: AwaitedActionDb,
770
    I: InstantWrapper,
771
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
772
{
773
    async fn update_operation(
774
        &self,
775
        operation_id: &OperationId,
776
        worker_id: &WorkerId,
777
        update: UpdateOperationType,
778
12
    ) -> Result<(), Error> {
779
12
        self.inner_update_operation(operation_id, Some(worker_id), update)
780
0
            .await
781
24
    }
782
}
783
784
#[async_trait]
785
impl<T, I, NowFn> MatchingEngineStateManager for SimpleSchedulerStateManager<T, I, NowFn>
786
where
787
    T: AwaitedActionDb,
788
    I: InstantWrapper,
789
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
790
{
791
    async fn filter_operations<'a>(
792
        &'a self,
793
        filter: OperationFilter,
794
91
    ) -> Result<ActionStateResultStream<'a>, Error> {
795
91
        self.inner_filter_operations(filter, |rx| {
796
42
            Box::new(MatchingEngineActionStateResult::new(
797
42
                rx,
798
42
                self.weak_self.clone(),
799
42
                self.no_event_action_timeout,
800
42
                self.now_fn.clone(),
801
42
            ))
802
91
        })
803
0
        .await
804
182
    }
805
806
    async fn assign_operation(
807
        &self,
808
        operation_id: &OperationId,
809
        worker_id_or_reason_for_unsassign: Result<&WorkerId, Error>,
810
29
    ) -> Result<(), Error> {
811
29
        let (maybe_worker_id, update) = match worker_id_or_reason_for_unsassign {
812
28
            Ok(worker_id) => (
813
28
                Some(worker_id),
814
28
                UpdateOperationType::UpdateWithActionStage(ActionStage::Executing),
815
28
            ),
816
1
            Err(err) => (None, UpdateOperationType::UpdateWithError(err)),
817
        };
818
29
        self.inner_update_operation(operation_id, maybe_worker_id, update)
819
0
            .await
820
58
    }
821
}