Coverage Report

Created: 2024-11-20 10:13

/build/source/nativelink-scheduler/src/simple_scheduler_state_manager.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::ops::Bound;
16
use std::string::ToString;
17
use std::sync::{Arc, Weak};
18
use std::time::{Duration, SystemTime};
19
20
use async_lock::Mutex;
21
use async_trait::async_trait;
22
use futures::{stream, StreamExt, TryStreamExt};
23
use nativelink_error::{make_err, Code, Error, ResultExt};
24
use nativelink_metric::MetricsComponent;
25
use nativelink_util::action_messages::{
26
    ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueQualifier, ExecutionMetadata,
27
    OperationId, WorkerId,
28
};
29
use nativelink_util::instant_wrapper::InstantWrapper;
30
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
31
use nativelink_util::operation_state_manager::{
32
    ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
33
    OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType, WorkerStateManager,
34
};
35
use tracing::{event, Level};
36
37
use super::awaited_action_db::{
38
    AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, SortedAwaitedActionState,
39
};
40
41
/// Maximum number of times an update to the database
42
/// can fail before giving up.
43
const MAX_UPDATE_RETRIES: usize = 5;
44
45
/// Simple struct that implements the ActionStateResult trait and always returns an error.
46
struct ErrorActionStateResult(Error);
47
48
#[async_trait]
49
impl ActionStateResult for ErrorActionStateResult {
50
0
    async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
51
0
        Err(self.0.clone())
52
0
    }
53
54
0
    async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
55
0
        Err(self.0.clone())
56
0
    }
57
58
0
    async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
59
0
        Err(self.0.clone())
60
0
    }
61
}
62
63
struct ClientActionStateResult<U, T, I, NowFn>
64
where
65
    U: AwaitedActionSubscriber,
66
    T: AwaitedActionDb,
67
    I: InstantWrapper,
68
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
69
{
70
    inner: MatchingEngineActionStateResult<U, T, I, NowFn>,
71
}
72
73
impl<U, T, I, NowFn> ClientActionStateResult<U, T, I, NowFn>
74
where
75
    U: AwaitedActionSubscriber,
76
    T: AwaitedActionDb,
77
    I: InstantWrapper,
78
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
79
{
80
28
    fn new(
81
28
        sub: U,
82
28
        simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
83
28
        no_event_action_timeout: Duration,
84
28
        now_fn: NowFn,
85
28
    ) -> Self {
86
28
        Self {
87
28
            inner: MatchingEngineActionStateResult::new(
88
28
                sub,
89
28
                simple_scheduler_state_manager,
90
28
                no_event_action_timeout,
91
28
                now_fn,
92
28
            ),
93
28
        }
94
28
    }
95
}
96
97
#[async_trait]
98
impl<U, T, I, NowFn> ActionStateResult for ClientActionStateResult<U, T, I, NowFn>
99
where
100
    U: AwaitedActionSubscriber,
101
    T: AwaitedActionDb,
102
    I: InstantWrapper,
103
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
104
{
105
3
    async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
106
3
        self.inner.as_state().
await0
107
6
    }
108
109
44
    async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
110
1.49k
        
self.inner.changed()44
.await
111
86
    }
112
113
0
    async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
114
0
        self.inner.as_action_info().await
115
0
    }
116
}
117
118
struct MatchingEngineActionStateResult<U, T, I, NowFn>
119
where
120
    U: AwaitedActionSubscriber,
121
    T: AwaitedActionDb,
122
    I: InstantWrapper,
123
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
124
{
125
    awaited_action_sub: U,
126
    simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
127
    no_event_action_timeout: Duration,
128
    now_fn: NowFn,
129
}
130
impl<U, T, I, NowFn> MatchingEngineActionStateResult<U, T, I, NowFn>
131
where
132
    U: AwaitedActionSubscriber,
133
    T: AwaitedActionDb,
134
    I: InstantWrapper,
135
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
136
{
137
70
    fn new(
138
70
        awaited_action_sub: U,
139
70
        simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
140
70
        no_event_action_timeout: Duration,
141
70
        now_fn: NowFn,
142
70
    ) -> Self {
143
70
        Self {
144
70
            awaited_action_sub,
145
70
            simple_scheduler_state_manager,
146
70
            no_event_action_timeout,
147
70
            now_fn,
148
70
        }
149
70
    }
150
}
151
152
#[async_trait]
153
impl<U, T, I, NowFn> ActionStateResult for MatchingEngineActionStateResult<U, T, I, NowFn>
154
where
155
    U: AwaitedActionSubscriber,
156
    T: AwaitedActionDb,
157
    I: InstantWrapper,
158
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
159
{
160
31
    async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
161
31
        Ok(self
162
31
            .awaited_action_sub
163
31
            .borrow()
164
0
            .await
165
31
            .err_tip(|| 
"In MatchingEngineActionStateResult::as_state"0
)
?0
166
31
            .state()
167
31
            .clone())
168
62
    }
169
170
44
    async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
171
44
        let mut timeout_attempts = 0;
172
        loop {
173
54
            tokio::select! {
174
54
                
awaited_action_result42
= self.awaited_action_sub.changed() => {
175
42
                    return awaited_action_result
176
42
                        .err_tip(|| 
"In MatchingEngineActionStateResult::changed"0
)
177
42
                        .map(|v| v.state().clone());
178
                }
179
54
                _ = (self.now_fn)().sleep(self.no_event_action_timeout) => {
180
10
                    // Timeout happened, do additional checks below.
181
10
                }
182
            }
183
184
10
            let awaited_action = self
185
10
                .awaited_action_sub
186
10
                .borrow()
187
0
                .await
188
10
                .err_tip(|| 
"In MatchingEngineActionStateResult::changed"0
)
?0
;
189
190
10
            if 
matches!1
(awaited_action.state().stage, ActionStage::Queued) {
191
                // Actions in queued state do not get periodically updated,
192
                // so we don't need to timeout them.
193
9
                continue;
194
1
            }
195
196
1
            let simple_scheduler_state_manager = self
197
1
                .simple_scheduler_state_manager
198
1
                .upgrade()
199
1
                .err_tip(|| 
format!("Failed to upgrade weak reference to SimpleSchedulerStateManager in MatchingEngineActionStateResult::changed at attempt: {timeout_attempts}")0
)
?0
;
200
201
1
            event!(
202
1
                Level::WARN,
203
                ?awaited_action,
204
1
                "OperationId {} / {} timed out after {} seconds issuing a retry",
205
1
                awaited_action.operation_id(),
206
1
                awaited_action.state().client_operation_id,
207
1
                self.no_event_action_timeout.as_secs_f32(),
208
            );
209
210
1
            simple_scheduler_state_manager
211
1
                .timeout_operation_id(awaited_action.operation_id())
212
0
                .await
213
1
                .err_tip(|| 
"In MatchingEngineActionStateResult::changed"0
)
?0
;
214
215
1
            if timeout_attempts >= MAX_UPDATE_RETRIES {
  Branch (215:16): [True: 0, False: 0]
  Branch (215:16): [True: 0, False: 0]
  Branch (215:16): [Folded - Ignored]
  Branch (215:16): [True: 0, False: 1]
  Branch (215:16): [True: 0, False: 0]
216
0
                return Err(make_err!(
217
0
                    Code::Internal,
218
0
                    "Failed to update action after {} retries with no error set in MatchingEngineActionStateResult::changed - {} {:?}",
219
0
                    MAX_UPDATE_RETRIES,
220
0
                    awaited_action.operation_id(),
221
0
                    awaited_action.state().stage,
222
0
                ));
223
1
            }
224
1
            timeout_attempts += 1;
225
        }
226
86
    }
227
228
42
    async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
229
42
        Ok(self
230
42
            .awaited_action_sub
231
42
            .borrow()
232
0
            .await
233
42
            .err_tip(|| 
"In MatchingEngineActionStateResult::as_action_info"0
)
?0
234
42
            .action_info()
235
42
            .clone())
236
84
    }
237
}
238
239
/// SimpleSchedulerStateManager is responsible for maintaining the state of the scheduler.
240
/// Scheduler state includes the actions that are queued, active, and recently completed.
241
/// It also includes the workers that are available to execute actions based on allocation
242
/// strategy.
243
0
#[derive(MetricsComponent)]
244
pub struct SimpleSchedulerStateManager<T, I, NowFn>
245
where
246
    T: AwaitedActionDb,
247
    I: InstantWrapper,
248
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
249
{
250
    /// Database for storing the state of all actions.
251
    #[metric(group = "action_db")]
252
    action_db: T,
253
254
    /// Maximum number of times a job can be retried.
255
    // TODO(allada) This should be a scheduler decorator instead
256
    // of always having it on every SimpleScheduler.
257
    #[metric(help = "Maximum number of times a job can be retried")]
258
    max_job_retries: usize,
259
260
    /// Duration after which an action is considered to be timed out if
261
    /// no event is received.
262
    #[metric(
263
        help = "Duration after which an action is considered to be timed out if no event is received"
264
    )]
265
    no_event_action_timeout: Duration,
266
267
    /// Mark operation as timed out if the worker has not updated in this duration.
268
    /// This is used to prevent operations from being stuck in the queue forever
269
    /// if it is not being processed by any worker.
270
    client_action_timeout: Duration,
271
272
    // A lock to ensure only one timeout operation is running at a time
273
    // on this service.
274
    timeout_operation_mux: Mutex<()>,
275
276
    /// Weak reference to self.
277
    // We use a weak reference to reduce the risk of a memory leak from
278
    // future changes. If this becomes some kind of perforamnce issue,
279
    // we can consider using a strong reference.
280
    weak_self: Weak<Self>,
281
282
    /// Function to get the current time.
283
    now_fn: NowFn,
284
}
285
286
impl<T, I, NowFn> SimpleSchedulerStateManager<T, I, NowFn>
287
where
288
    T: AwaitedActionDb,
289
    I: InstantWrapper,
290
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
291
{
292
21
    pub fn new(
293
21
        max_job_retries: usize,
294
21
        no_event_action_timeout: Duration,
295
21
        client_action_timeout: Duration,
296
21
        action_db: T,
297
21
        now_fn: NowFn,
298
21
    ) -> Arc<Self> {
299
21
        Arc::new_cyclic(|weak_self| Self {
300
21
            action_db,
301
21
            max_job_retries,
302
21
            no_event_action_timeout,
303
21
            client_action_timeout,
304
21
            timeout_operation_mux: Mutex::new(()),
305
21
            weak_self: weak_self.clone(),
306
21
            now_fn,
307
21
        })
308
21
    }
309
310
45
    async fn apply_filter_predicate(
311
45
        &self,
312
45
        awaited_action: &AwaitedAction,
313
45
        filter: &OperationFilter,
314
45
    ) -> bool {
315
45
        // Note: The caller must filter `client_operation_id`.
316
45
317
45
        if awaited_action.last_client_keepalive_timestamp() + self.client_action_timeout
  Branch (317:12): [True: 0, False: 0]
  Branch (317:12): [True: 0, False: 0]
  Branch (317:12): [Folded - Ignored]
  Branch (317:12): [True: 0, False: 43]
  Branch (317:12): [True: 0, False: 2]
318
45
            < (self.now_fn)().now()
319
        {
320
0
            if !awaited_action.state().stage.is_finished() {
  Branch (320:16): [True: 0, False: 0]
  Branch (320:16): [True: 0, False: 0]
  Branch (320:16): [Folded - Ignored]
  Branch (320:16): [True: 0, False: 0]
  Branch (320:16): [True: 0, False: 0]
321
0
                let mut state = awaited_action.state().as_ref().clone();
322
0
                state.stage = ActionStage::Completed(ActionResult {
323
0
                    error: Some(make_err!(
324
0
                        Code::DeadlineExceeded,
325
0
                        "Operation timed out {} seconds of having no more clients listening",
326
0
                        self.client_action_timeout.as_secs_f32(),
327
0
                    )),
328
0
                    ..ActionResult::default()
329
0
                });
330
0
                let mut new_awaited_action = awaited_action.clone();
331
0
                new_awaited_action.worker_set_state(Arc::new(state), (self.now_fn)().now());
332
0
                if let Err(err) = self
  Branch (332:24): [True: 0, False: 0]
  Branch (332:24): [True: 0, False: 0]
  Branch (332:24): [Folded - Ignored]
  Branch (332:24): [True: 0, False: 0]
  Branch (332:24): [True: 0, False: 0]
333
0
                    .action_db
334
0
                    .update_awaited_action(new_awaited_action)
335
0
                    .await
336
                {
337
0
                    event!(
338
0
                        Level::WARN,
339
0
                        "Failed to update action to timed out state after client keepalive timeout. This is ok if multiple schedulers tried to set the state at the same time: {err}",
340
                    );
341
0
                }
342
0
            }
343
0
            return false;
344
45
        }
345
346
45
        if let Some(
operation_id0
) = &filter.operation_id {
  Branch (346:16): [True: 0, False: 0]
  Branch (346:16): [True: 0, False: 0]
  Branch (346:16): [Folded - Ignored]
  Branch (346:16): [True: 0, False: 43]
  Branch (346:16): [True: 0, False: 2]
347
0
            if operation_id != awaited_action.operation_id() {
  Branch (347:16): [True: 0, False: 0]
  Branch (347:16): [True: 0, False: 0]
  Branch (347:16): [Folded - Ignored]
  Branch (347:16): [True: 0, False: 0]
  Branch (347:16): [True: 0, False: 0]
348
0
                return false;
349
0
            }
350
45
        }
351
352
45
        if filter.worker_id.is_some() && 
filter.worker_id != awaited_action.worker_id()0
{
  Branch (352:12): [True: 0, False: 0]
  Branch (352:42): [True: 0, False: 0]
  Branch (352:12): [True: 0, False: 0]
  Branch (352:42): [True: 0, False: 0]
  Branch (352:12): [Folded - Ignored]
  Branch (352:42): [Folded - Ignored]
  Branch (352:12): [True: 0, False: 43]
  Branch (352:42): [True: 0, False: 0]
  Branch (352:12): [True: 0, False: 2]
  Branch (352:42): [True: 0, False: 0]
353
0
            return false;
354
45
        }
355
356
        {
357
45
            if let Some(
filter_unique_key0
) = &filter.unique_key {
  Branch (357:20): [True: 0, False: 0]
  Branch (357:20): [True: 0, False: 0]
  Branch (357:20): [Folded - Ignored]
  Branch (357:20): [True: 0, False: 43]
  Branch (357:20): [True: 0, False: 2]
358
0
                match &awaited_action.action_info().unique_qualifier {
359
0
                    ActionUniqueQualifier::Cachable(unique_key) => {
360
0
                        if filter_unique_key != unique_key {
  Branch (360:28): [True: 0, False: 0]
  Branch (360:28): [True: 0, False: 0]
  Branch (360:28): [Folded - Ignored]
  Branch (360:28): [True: 0, False: 0]
  Branch (360:28): [True: 0, False: 0]
361
0
                            return false;
362
0
                        }
363
                    }
364
                    ActionUniqueQualifier::Uncachable(_) => {
365
0
                        return false;
366
                    }
367
                }
368
45
            }
369
45
            if let Some(
action_digest0
) = filter.action_digest {
  Branch (369:20): [True: 0, False: 0]
  Branch (369:20): [True: 0, False: 0]
  Branch (369:20): [Folded - Ignored]
  Branch (369:20): [True: 0, False: 43]
  Branch (369:20): [True: 0, False: 2]
370
0
                if action_digest != awaited_action.action_info().digest() {
  Branch (370:20): [True: 0, False: 0]
  Branch (370:20): [True: 0, False: 0]
  Branch (370:20): [Folded - Ignored]
  Branch (370:20): [True: 0, False: 0]
  Branch (370:20): [True: 0, False: 0]
371
0
                    return false;
372
0
                }
373
45
            }
374
        }
375
376
        {
377
45
            let last_worker_update_timestamp = awaited_action.last_worker_updated_timestamp();
378
45
            if let Some(
worker_update_before0
) = filter.worker_update_before {
  Branch (378:20): [True: 0, False: 0]
  Branch (378:20): [True: 0, False: 0]
  Branch (378:20): [Folded - Ignored]
  Branch (378:20): [True: 0, False: 43]
  Branch (378:20): [True: 0, False: 2]
379
0
                if worker_update_before < last_worker_update_timestamp {
  Branch (379:20): [True: 0, False: 0]
  Branch (379:20): [True: 0, False: 0]
  Branch (379:20): [Folded - Ignored]
  Branch (379:20): [True: 0, False: 0]
  Branch (379:20): [True: 0, False: 0]
380
0
                    return false;
381
0
                }
382
45
            }
383
45
            if let Some(
completed_before0
) = filter.completed_before {
  Branch (383:20): [True: 0, False: 0]
  Branch (383:20): [True: 0, False: 0]
  Branch (383:20): [Folded - Ignored]
  Branch (383:20): [True: 0, False: 43]
  Branch (383:20): [True: 0, False: 2]
384
0
                if awaited_action.state().stage.is_finished()
  Branch (384:20): [True: 0, False: 0]
  Branch (384:20): [True: 0, False: 0]
  Branch (384:20): [Folded - Ignored]
  Branch (384:20): [True: 0, False: 0]
  Branch (384:20): [True: 0, False: 0]
385
0
                    && completed_before < last_worker_update_timestamp
  Branch (385:24): [True: 0, False: 0]
  Branch (385:24): [True: 0, False: 0]
  Branch (385:24): [Folded - Ignored]
  Branch (385:24): [True: 0, False: 0]
  Branch (385:24): [True: 0, False: 0]
386
                {
387
0
                    return false;
388
0
                }
389
45
            }
390
45
            if filter.stages != OperationStageFlags::Any {
  Branch (390:16): [True: 0, False: 0]
  Branch (390:16): [True: 0, False: 0]
  Branch (390:16): [Folded - Ignored]
  Branch (390:16): [True: 40, False: 3]
  Branch (390:16): [True: 2, False: 0]
391
42
                let stage_flag = match awaited_action.state().stage {
392
0
                    ActionStage::Unknown => OperationStageFlags::Any,
393
0
                    ActionStage::CacheCheck => OperationStageFlags::CacheCheck,
394
42
                    ActionStage::Queued => OperationStageFlags::Queued,
395
0
                    ActionStage::Executing => OperationStageFlags::Executing,
396
                    ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => {
397
0
                        OperationStageFlags::Completed
398
                    }
399
                };
400
42
                if !filter.stages.intersects(stage_flag) {
  Branch (400:20): [True: 0, False: 0]
  Branch (400:20): [True: 0, False: 0]
  Branch (400:20): [Folded - Ignored]
  Branch (400:20): [True: 0, False: 40]
  Branch (400:20): [True: 0, False: 2]
401
0
                    return false;
402
42
                }
403
3
            }
404
        }
405
406
45
        true
407
45
    }
408
409
    /// Let the scheduler know that an operation has timed out from
410
    /// the client side (ie: worker has not updated in a while).
411
1
    async fn timeout_operation_id(&self, operation_id: &OperationId) -> Result<(), Error> {
412
        // Ensure that only one timeout operation is running at a time.
413
        // Failing to do this could result in the same operation being
414
        // timed out multiple times at the same time.
415
        // Note: We could implement this on a per-operation_id basis, but it is quite
416
        // complex to manage the locks.
417
1
        let _lock = self.timeout_operation_mux.lock().
await0
;
418
419
1
        let awaited_action_subscriber = self
420
1
            .action_db
421
1
            .get_by_operation_id(operation_id)
422
0
            .await
423
1
            .err_tip(|| 
"In SimpleSchedulerStateManager::timeout_operation_id"0
)
?0
424
1
            .err_tip(|| {
425
0
                format!("Operation id {operation_id} does not exist in SimpleSchedulerStateManager::timeout_operation_id")
426
1
            })
?0
;
427
428
1
        let awaited_action = awaited_action_subscriber
429
1
            .borrow()
430
0
            .await
431
1
            .err_tip(|| 
"In SimpleSchedulerStateManager::timeout_operation_id"0
)
?0
;
432
433
        // If the action is not executing, we should not timeout the action.
434
1
        if !
matches!0
(awaited_action.state().stage, ActionStage::Executing) {
  Branch (434:12): [True: 0, False: 0]
  Branch (434:12): [True: 0, False: 0]
  Branch (434:12): [Folded - Ignored]
  Branch (434:12): [True: 0, False: 1]
  Branch (434:12): [True: 0, False: 0]
435
0
            return Ok(());
436
1
        }
437
438
1
        let last_worker_updated = awaited_action
439
1
            .last_worker_updated_timestamp()
440
1
            .duration_since(SystemTime::UNIX_EPOCH)
441
1
            .map_err(|e| {
442
0
                make_err!(
443
0
                    Code::Internal,
444
0
                    "Failed to convert last_worker_updated to duration since epoch {e:?}"
445
0
                )
446
1
            })
?0
;
447
1
        let worker_should_update_before = last_worker_updated
448
1
            .checked_add(self.no_event_action_timeout)
449
1
            .err_tip(|| 
"Timestamp too big in SimpleSchedulerStateManager::timeout_operation_id"0
)
?0
;
450
1
        if worker_should_update_before < (self.now_fn)().elapsed() {
  Branch (450:12): [True: 0, False: 0]
  Branch (450:12): [True: 0, False: 0]
  Branch (450:12): [Folded - Ignored]
  Branch (450:12): [True: 0, False: 1]
  Branch (450:12): [True: 0, False: 0]
451
            // The action was updated recently, we should not timeout the action.
452
            // This is to prevent timing out actions that have recently been updated
453
            // (like multiple clients timeout the same action at the same time).
454
0
            return Ok(());
455
1
        }
456
1
457
1
        self.assign_operation(
458
1
            operation_id,
459
1
            Err(make_err!(
460
1
                Code::DeadlineExceeded,
461
1
                "Operation timed out after {} seconds",
462
1
                self.no_event_action_timeout.as_secs_f32(),
463
1
            )),
464
1
        )
465
0
        .await
466
1
    }
467
468
41
    async fn inner_update_operation(
469
41
        &self,
470
41
        operation_id: &OperationId,
471
41
        maybe_worker_id: Option<&WorkerId>,
472
41
        update: UpdateOperationType,
473
41
    ) -> Result<(), Error> {
474
41
        let mut last_err = None;
475
42
        for _ in 0..MAX_UPDATE_RETRIES {
476
42
            let maybe_awaited_action_subscriber = self
477
42
                .action_db
478
42
                .get_by_operation_id(operation_id)
479
0
                .await
480
42
                .err_tip(|| 
"In SimpleSchedulerStateManager::update_operation"0
)
?0
;
481
42
            let Some(
awaited_action_subscriber41
) = maybe_awaited_action_subscriber else {
  Branch (481:17): [True: 0, False: 0]
  Branch (481:17): [True: 0, False: 0]
  Branch (481:17): [Folded - Ignored]
  Branch (481:17): [True: 39, False: 0]
  Branch (481:17): [True: 2, False: 1]
482
                // No action found. It is ok if the action was not found. It
483
                // probably means that the action was dropped, but worker was
484
                // still processing it.
485
1
                return Ok(());
486
            };
487
488
41
            let mut awaited_action = awaited_action_subscriber
489
41
                .borrow()
490
0
                .await
491
41
                .err_tip(|| 
"In SimpleSchedulerStateManager::update_operation"0
)
?0
;
492
493
            // Make sure the worker id matches the awaited action worker id.
494
            // This might happen if the worker sending the update is not the
495
            // worker that was assigned.
496
41
            if awaited_action.worker_id().is_some()
  Branch (496:16): [True: 0, False: 0]
  Branch (496:16): [True: 0, False: 0]
  Branch (496:16): [Folded - Ignored]
  Branch (496:16): [True: 13, False: 26]
  Branch (496:16): [True: 0, False: 2]
497
13
                && maybe_worker_id.is_some()
  Branch (497:20): [True: 0, False: 0]
  Branch (497:20): [True: 0, False: 0]
  Branch (497:20): [Folded - Ignored]
  Branch (497:20): [True: 12, False: 1]
  Branch (497:20): [True: 0, False: 0]
498
12
                && maybe_worker_id != awaited_action.worker_id().as_ref()
  Branch (498:20): [True: 0, False: 0]
  Branch (498:20): [True: 0, False: 0]
  Branch (498:20): [Folded - Ignored]
  Branch (498:20): [True: 0, False: 12]
  Branch (498:20): [True: 0, False: 0]
499
            {
500
                // If another worker is already assigned to the action, another
501
                // worker probably picked up the action. We should not update the
502
                // action in this case and abort this operation.
503
0
                let err = make_err!(
504
0
                    Code::Aborted,
505
0
                    "Worker ids do not match - {:?} != {:?} for {:?}",
506
0
                    maybe_worker_id,
507
0
                    awaited_action.worker_id(),
508
0
                    awaited_action,
509
0
                );
510
0
                event!(
511
0
                    Level::INFO,
512
0
                    "Worker ids do not match - {:?} != {:?} for {:?}. This is probably due to another worker picking up the action.",
513
0
                    maybe_worker_id,
514
0
                    awaited_action.worker_id(),
515
                    awaited_action,
516
                );
517
0
                return Err(err);
518
41
            }
519
41
520
41
            // Make sure we don't update an action that is already completed.
521
41
            if awaited_action.state().stage.is_finished() {
  Branch (521:16): [True: 0, False: 0]
  Branch (521:16): [True: 0, False: 0]
  Branch (521:16): [Folded - Ignored]
  Branch (521:16): [True: 0, False: 39]
  Branch (521:16): [True: 0, False: 2]
522
0
                return Err(make_err!(
523
0
                    Code::Internal,
524
0
                    "Action {operation_id:?} is already completed with state {:?} - maybe_worker_id: {:?}",
525
0
                    awaited_action.state().stage,
526
0
                    maybe_worker_id,
527
0
                ));
528
41
            }
529
530
41
            let stage = match &update {
531
                UpdateOperationType::KeepAlive => {
532
0
                    awaited_action.worker_keep_alive((self.now_fn)().now());
533
0
                    return self
534
0
                        .action_db
535
0
                        .update_awaited_action(awaited_action)
536
0
                        .await
537
0
                        .err_tip(|| "Failed to send KeepAlive in SimpleSchedulerStateManager::update_operation");
538
                }
539
33
                UpdateOperationType::UpdateWithActionStage(stage) => stage.clone(),
540
8
                UpdateOperationType::UpdateWithError(err) => {
541
8
                    // Don't count a backpressure failure as an attempt for an action.
542
8
                    let due_to_backpressure = err.code == Code::ResourceExhausted;
543
8
                    if !due_to_backpressure {
  Branch (543:24): [True: 0, False: 0]
  Branch (543:24): [True: 0, False: 0]
  Branch (543:24): [Folded - Ignored]
  Branch (543:24): [True: 8, False: 0]
  Branch (543:24): [True: 0, False: 0]
544
8
                        awaited_action.attempts += 1;
545
8
                    }
0
546
547
8
                    if awaited_action.attempts > self.max_job_retries {
  Branch (547:24): [True: 0, False: 0]
  Branch (547:24): [True: 0, False: 0]
  Branch (547:24): [Folded - Ignored]
  Branch (547:24): [True: 1, False: 7]
  Branch (547:24): [True: 0, False: 0]
548
1
                        ActionStage::Completed(ActionResult {
549
1
                            execution_metadata: ExecutionMetadata {
550
1
                                worker: maybe_worker_id.map_or_else(String::default, ToString::to_string),
551
1
                                ..ExecutionMetadata::default()
552
1
                            },
553
1
                            error: Some(err.clone().merge(make_err!(
554
1
                                Code::Internal,
555
1
                                "Job cancelled because it attempted to execute too many times {} > {} times {}",
556
1
                                awaited_action.attempts,
557
1
                                self.max_job_retries,
558
1
                                format!("for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}"),
559
1
                            ))),
560
1
                            ..ActionResult::default()
561
1
                        })
562
                    } else {
563
7
                        ActionStage::Queued
564
                    }
565
                }
566
            };
567
41
            let now = (self.now_fn)().now();
568
41
            if 
matches!34
(stage, ActionStage::Queued) {
569
7
                // If the action is queued, we need to unset the worker id regardless of
570
7
                // which worker sent the update.
571
7
                awaited_action.set_worker_id(None, now);
572
34
            } else {
573
34
                awaited_action.set_worker_id(maybe_worker_id.copied(), now);
574
34
            }
575
41
            awaited_action.worker_set_state(
576
41
                Arc::new(ActionState {
577
41
                    stage,
578
41
                    // Client id is not known here, it is the responsibility of
579
41
                    // the the subscriber impl to replace this with the
580
41
                    // correct client id.
581
41
                    client_operation_id: operation_id.clone(),
582
41
                    action_digest: awaited_action.action_info().digest(),
583
41
                }),
584
41
                now,
585
41
            );
586
587
41
            let update_action_result = self
588
41
                .action_db
589
41
                .update_awaited_action(awaited_action)
590
0
                .await
591
41
                .err_tip(|| 
"In SimpleSchedulerStateManager::update_operation"2
);
592
41
            if let Err(
err2
) = update_action_result {
  Branch (592:20): [True: 0, False: 0]
  Branch (592:20): [True: 0, False: 0]
  Branch (592:20): [Folded - Ignored]
  Branch (592:20): [True: 0, False: 39]
  Branch (592:20): [True: 2, False: 0]
593
                // We use Aborted to signal that the action was not
594
                // updated due to the data being set was not the latest
595
                // but can be retried.
596
2
                if err.code == Code::Aborted {
  Branch (596:20): [True: 0, False: 0]
  Branch (596:20): [True: 0, False: 0]
  Branch (596:20): [Folded - Ignored]
  Branch (596:20): [True: 0, False: 0]
  Branch (596:20): [True: 1, False: 1]
597
1
                    last_err = Some(err);
598
1
                    continue;
599
1
                }
600
1
                return Err(err);
601
39
            }
602
39
            return Ok(());
603
        }
604
0
        match last_err {
605
0
            Some(err) => Err(err),
606
0
            None => Err(make_err!(
607
0
                Code::Internal,
608
0
                "Failed to update action after {} retries with no error set",
609
0
                MAX_UPDATE_RETRIES,
610
0
            )),
611
        }
612
41
    }
613
614
25
    async fn inner_add_operation(
615
25
        &self,
616
25
        new_client_operation_id: OperationId,
617
25
        action_info: Arc<ActionInfo>,
618
25
    ) -> Result<T::Subscriber, Error> {
619
25
        self.action_db
620
25
            .add_action(new_client_operation_id, action_info)
621
0
            .await
622
25
            .err_tip(|| 
"In SimpleSchedulerStateManager::add_operation"0
)
623
25
    }
624
625
594
    async fn inner_filter_operations<'a, F>(
626
594
        &'a self,
627
594
        filter: OperationFilter,
628
594
        to_action_state_result: F,
629
594
    ) -> Result<ActionStateResultStream<'a>, Error>
630
594
    where
631
594
        F: Fn(T::Subscriber) -> Box<dyn ActionStateResult> + Send + Sync + 'a,
632
594
    {
633
91
        fn sorted_awaited_action_state_for_flags(
634
91
            stage: OperationStageFlags,
635
91
        ) -> Option<SortedAwaitedActionState> {
636
91
            match stage {
637
0
                OperationStageFlags::CacheCheck => Some(SortedAwaitedActionState::CacheCheck),
638
91
                OperationStageFlags::Queued => Some(SortedAwaitedActionState::Queued),
639
0
                OperationStageFlags::Executing => Some(SortedAwaitedActionState::Executing),
640
0
                OperationStageFlags::Completed => Some(SortedAwaitedActionState::Completed),
641
0
                _ => None,
642
            }
643
91
        }
644
645
594
        if let Some(
operation_id0
) = &filter.operation_id {
  Branch (645:16): [True: 0, False: 0]
  Branch (645:16): [True: 0, False: 0]
  Branch (645:16): [True: 0, False: 0]
  Branch (645:16): [True: 0, False: 0]
  Branch (645:16): [Folded - Ignored]
  Branch (645:16): [True: 0, False: 503]
  Branch (645:16): [True: 0, False: 87]
  Branch (645:16): [True: 0, False: 0]
  Branch (645:16): [True: 0, False: 4]
646
0
            let maybe_subscriber = self
647
0
                .action_db
648
0
                .get_by_operation_id(operation_id)
649
0
                .await
650
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
651
0
            let Some(subscriber) = maybe_subscriber else {
  Branch (651:17): [True: 0, False: 0]
  Branch (651:17): [True: 0, False: 0]
  Branch (651:17): [True: 0, False: 0]
  Branch (651:17): [True: 0, False: 0]
  Branch (651:17): [Folded - Ignored]
  Branch (651:17): [True: 0, False: 0]
  Branch (651:17): [True: 0, False: 0]
  Branch (651:17): [True: 0, False: 0]
  Branch (651:17): [True: 0, False: 0]
652
0
                return Ok(Box::pin(stream::empty()));
653
            };
654
0
            let awaited_action = subscriber
655
0
                .borrow()
656
0
                .await
657
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
658
0
            if !self.apply_filter_predicate(&awaited_action, &filter).await {
  Branch (658:16): [True: 0, False: 0]
  Branch (658:16): [True: 0, False: 0]
  Branch (658:16): [True: 0, False: 0]
  Branch (658:16): [True: 0, False: 0]
  Branch (658:16): [Folded - Ignored]
  Branch (658:16): [True: 0, False: 0]
  Branch (658:16): [True: 0, False: 0]
  Branch (658:16): [True: 0, False: 0]
  Branch (658:16): [True: 0, False: 0]
659
0
                return Ok(Box::pin(stream::empty()));
660
0
            }
661
0
            return Ok(Box::pin(stream::once(async move {
662
0
                to_action_state_result(subscriber)
663
0
            })));
664
594
        }
665
594
        if let Some(
client_operation_id503
) = &filter.client_operation_id {
  Branch (665:16): [True: 0, False: 0]
  Branch (665:16): [True: 0, False: 0]
  Branch (665:16): [True: 0, False: 0]
  Branch (665:16): [True: 0, False: 0]
  Branch (665:16): [Folded - Ignored]
  Branch (665:16): [True: 503, False: 0]
  Branch (665:16): [True: 0, False: 87]
  Branch (665:16): [True: 0, False: 0]
  Branch (665:16): [True: 0, False: 4]
666
503
            let maybe_subscriber = self
667
503
                .action_db
668
503
                .get_awaited_action_by_id(client_operation_id)
669
0
                .await
670
503
                .err_tip(|| 
"In SimpleSchedulerStateManager::filter_operations"0
)
?0
;
671
503
            let Some(
subscriber3
) = maybe_subscriber else {
  Branch (671:17): [True: 0, False: 0]
  Branch (671:17): [True: 0, False: 0]
  Branch (671:17): [True: 0, False: 0]
  Branch (671:17): [True: 0, False: 0]
  Branch (671:17): [Folded - Ignored]
  Branch (671:17): [True: 3, False: 500]
  Branch (671:17): [True: 0, False: 0]
  Branch (671:17): [True: 0, False: 0]
  Branch (671:17): [True: 0, False: 0]
672
500
                return Ok(Box::pin(stream::empty()));
673
            };
674
3
            let awaited_action = subscriber
675
3
                .borrow()
676
0
                .await
677
3
                .err_tip(|| 
"In SimpleSchedulerStateManager::filter_operations"0
)
?0
;
678
3
            if !self.apply_filter_predicate(&awaited_action, &filter).
await0
{
  Branch (678:16): [True: 0, False: 0]
  Branch (678:16): [True: 0, False: 0]
  Branch (678:16): [True: 0, False: 0]
  Branch (678:16): [True: 0, False: 0]
  Branch (678:16): [Folded - Ignored]
  Branch (678:16): [True: 0, False: 3]
  Branch (678:16): [True: 0, False: 0]
  Branch (678:16): [True: 0, False: 0]
  Branch (678:16): [True: 0, False: 0]
679
0
                return Ok(Box::pin(stream::empty()));
680
3
            }
681
3
            return Ok(Box::pin(stream::once(async move {
682
3
                to_action_state_result(subscriber)
683
3
            })));
684
91
        }
685
686
91
        let Some(sorted_awaited_action_state) =
  Branch (686:13): [True: 0, False: 0]
  Branch (686:13): [True: 0, False: 0]
  Branch (686:13): [True: 0, False: 0]
  Branch (686:13): [True: 0, False: 0]
  Branch (686:13): [Folded - Ignored]
  Branch (686:13): [True: 0, False: 0]
  Branch (686:13): [True: 87, False: 0]
  Branch (686:13): [True: 0, False: 0]
  Branch (686:13): [True: 4, False: 0]
687
91
            sorted_awaited_action_state_for_flags(filter.stages)
688
        else {
689
0
            let mut all_items: Vec<_> = self
690
0
                .action_db
691
0
                .get_all_awaited_actions()
692
0
                .await
693
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?
694
0
                .and_then(|awaited_action_subscriber| async move {
695
0
                    let awaited_action = awaited_action_subscriber
696
0
                        .borrow()
697
0
                        .await
698
0
                        .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
699
0
                    Ok((awaited_action_subscriber, awaited_action))
700
0
                })
701
0
                .try_filter_map(|(subscriber, awaited_action)| {
702
0
                    let filter = filter.clone();
703
0
                    async move {
704
0
                        if self.apply_filter_predicate(&awaited_action, &filter).await {
  Branch (704:28): [True: 0, False: 0]
  Branch (704:28): [True: 0, False: 0]
  Branch (704:28): [True: 0, False: 0]
  Branch (704:28): [True: 0, False: 0]
  Branch (704:28): [Folded - Ignored]
  Branch (704:28): [True: 0, False: 0]
  Branch (704:28): [True: 0, False: 0]
  Branch (704:28): [True: 0, False: 0]
  Branch (704:28): [True: 0, False: 0]
705
0
                            Ok(Some((subscriber, awaited_action.sort_key())))
706
                        } else {
707
0
                            Ok(None)
708
                        }
709
0
                    }
710
0
                })
711
0
                .try_collect()
712
0
                .await
713
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
714
0
            match filter.order_by_priority_direction {
715
0
                Some(OrderDirection::Asc) => all_items.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)),
716
0
                Some(OrderDirection::Desc) => all_items.sort_unstable_by(|(_, a), (_, b)| b.cmp(a)),
717
0
                None => {}
718
            }
719
0
            return Ok(Box::pin(stream::iter(
720
0
                all_items
721
0
                    .into_iter()
722
0
                    .map(move |(subscriber, _)| to_action_state_result(subscriber)),
723
0
            )));
724
        };
725
726
91
        let desc = 
matches!0
(
727
91
            filter.order_by_priority_direction,
728
            Some(OrderDirection::Desc)
729
        );
730
91
        let stream = self
731
91
            .action_db
732
91
            .get_range_of_actions(
733
91
                sorted_awaited_action_state,
734
91
                Bound::Unbounded,
735
91
                Bound::Unbounded,
736
91
                desc,
737
91
            )
738
0
            .await
739
91
            .err_tip(|| 
"In SimpleSchedulerStateManager::filter_operations"0
)
?0
740
91
            .and_then(|awaited_action_subscriber| async move 
{42
741
42
                let awaited_action = awaited_action_subscriber
742
42
                    .borrow()
743
0
                    .await
744
42
                    .err_tip(|| 
"In SimpleSchedulerStateManager::filter_operations"0
)
?0
;
745
42
                Ok((awaited_action_subscriber, awaited_action))
746
91
            
}84
)
747
91
            .try_filter_map(move |(subscriber, awaited_action)| {
748
42
                let filter = filter.clone();
749
42
                async move {
750
42
                    if self.apply_filter_predicate(&awaited_action, &filter).
await0
{
  Branch (750:24): [True: 0, False: 0]
  Branch (750:24): [True: 0, False: 0]
  Branch (750:24): [True: 0, False: 0]
  Branch (750:24): [True: 0, False: 0]
  Branch (750:24): [Folded - Ignored]
  Branch (750:24): [True: 0, False: 0]
  Branch (750:24): [True: 40, False: 0]
  Branch (750:24): [True: 0, False: 0]
  Branch (750:24): [True: 2, False: 0]
751
42
                        Ok(Some(subscriber))
752
                    } else {
753
0
                        Ok(None)
754
                    }
755
42
                }
756
91
            
}42
)
757
91
            .map(move |result| -> Box<dyn ActionStateResult> {
758
42
                result.map_or_else(
759
42
                    |e| -> Box<dyn ActionStateResult> 
{ Box::new(ErrorActionStateResult(e)) }0
,
760
42
                    |v| -> Box<dyn ActionStateResult> { to_action_state_result(v) },
761
42
                )
762
91
            });
763
91
        Ok(Box::pin(stream))
764
594
    }
765
}
766
767
#[async_trait]
768
impl<T, I, NowFn> ClientStateManager for SimpleSchedulerStateManager<T, I, NowFn>
769
where
770
    T: AwaitedActionDb,
771
    I: InstantWrapper,
772
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
773
{
774
    async fn add_action(
775
        &self,
776
        client_operation_id: OperationId,
777
        action_info: Arc<ActionInfo>,
778
25
    ) -> Result<Box<dyn ActionStateResult>, Error> {
779
25
        let sub = self
780
25
            .inner_add_operation(client_operation_id.clone(), action_info.clone())
781
0
            .await?;
782
783
25
        Ok(Box::new(ClientActionStateResult::new(
784
25
            sub,
785
25
            self.weak_self.clone(),
786
25
            self.no_event_action_timeout,
787
25
            self.now_fn.clone(),
788
25
        )))
789
50
    }
790
791
    async fn filter_operations<'a>(
792
        &'a self,
793
        filter: OperationFilter,
794
503
    ) -> Result<ActionStateResultStream<'a>, Error> {
795
503
        self.inner_filter_operations(filter, move |rx| {
796
3
            Box::new(ClientActionStateResult::new(
797
3
                rx,
798
3
                self.weak_self.clone(),
799
3
                self.no_event_action_timeout,
800
3
                self.now_fn.clone(),
801
3
            ))
802
503
        })
803
0
        .await
804
1.00k
    }
805
806
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
807
0
        None
808
0
    }
809
}
810
811
#[async_trait]
812
impl<T, I, NowFn> WorkerStateManager for SimpleSchedulerStateManager<T, I, NowFn>
813
where
814
    T: AwaitedActionDb,
815
    I: InstantWrapper,
816
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
817
{
818
    async fn update_operation(
819
        &self,
820
        operation_id: &OperationId,
821
        worker_id: &WorkerId,
822
        update: UpdateOperationType,
823
12
    ) -> Result<(), Error> {
824
12
        self.inner_update_operation(operation_id, Some(worker_id), update)
825
0
            .await
826
24
    }
827
}
828
829
#[async_trait]
830
impl<T, I, NowFn> MatchingEngineStateManager for SimpleSchedulerStateManager<T, I, NowFn>
831
where
832
    T: AwaitedActionDb,
833
    I: InstantWrapper,
834
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
835
{
836
    async fn filter_operations<'a>(
837
        &'a self,
838
        filter: OperationFilter,
839
91
    ) -> Result<ActionStateResultStream<'a>, Error> {
840
91
        self.inner_filter_operations(filter, |rx| {
841
42
            Box::new(MatchingEngineActionStateResult::new(
842
42
                rx,
843
42
                self.weak_self.clone(),
844
42
                self.no_event_action_timeout,
845
42
                self.now_fn.clone(),
846
42
            ))
847
91
        })
848
0
        .await
849
182
    }
850
851
    async fn assign_operation(
852
        &self,
853
        operation_id: &OperationId,
854
        worker_id_or_reason_for_unassign: Result<&WorkerId, Error>,
855
29
    ) -> Result<(), Error> {
856
29
        let (maybe_worker_id, update) = match worker_id_or_reason_for_unassign {
857
28
            Ok(worker_id) => (
858
28
                Some(worker_id),
859
28
                UpdateOperationType::UpdateWithActionStage(ActionStage::Executing),
860
28
            ),
861
1
            Err(err) => (None, UpdateOperationType::UpdateWithError(err)),
862
        };
863
29
        self.inner_update_operation(operation_id, maybe_worker_id, update)
864
0
            .await
865
58
    }
866
}