Coverage Report

Created: 2025-03-08 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/simple_scheduler_state_manager.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::ops::Bound;
16
use std::string::ToString;
17
use std::sync::{Arc, Weak};
18
use std::time::{Duration, SystemTime};
19
20
use async_lock::Mutex;
21
use async_trait::async_trait;
22
use futures::{stream, StreamExt, TryStreamExt};
23
use nativelink_error::{make_err, Code, Error, ResultExt};
24
use nativelink_metric::MetricsComponent;
25
use nativelink_util::action_messages::{
26
    ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueQualifier, ExecutionMetadata,
27
    OperationId, WorkerId,
28
};
29
use nativelink_util::instant_wrapper::InstantWrapper;
30
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
31
use nativelink_util::operation_state_manager::{
32
    ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
33
    OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType, WorkerStateManager,
34
};
35
use nativelink_util::origin_event::OriginMetadata;
36
use tracing::{event, Level};
37
38
use super::awaited_action_db::{
39
    AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, SortedAwaitedActionState,
40
};
41
42
/// Maximum number of times an update to the database
43
/// can fail before giving up.
44
const MAX_UPDATE_RETRIES: usize = 5;
45
46
/// Simple struct that implements the `ActionStateResult` trait and always returns an error.
47
struct ErrorActionStateResult(Error);
48
49
#[async_trait]
50
impl ActionStateResult for ErrorActionStateResult {
51
0
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
52
0
        Err(self.0.clone())
53
0
    }
54
55
0
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
56
0
        Err(self.0.clone())
57
0
    }
58
59
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
60
0
        Err(self.0.clone())
61
0
    }
62
}
63
64
struct ClientActionStateResult<U, T, I, NowFn>
65
where
66
    U: AwaitedActionSubscriber,
67
    T: AwaitedActionDb,
68
    I: InstantWrapper,
69
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
70
{
71
    inner: MatchingEngineActionStateResult<U, T, I, NowFn>,
72
}
73
74
impl<U, T, I, NowFn> ClientActionStateResult<U, T, I, NowFn>
75
where
76
    U: AwaitedActionSubscriber,
77
    T: AwaitedActionDb,
78
    I: InstantWrapper,
79
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
80
{
81
530
    const fn new(
82
530
        sub: U,
83
530
        simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
84
530
        no_event_action_timeout: Duration,
85
530
        now_fn: NowFn,
86
530
    ) -> Self {
87
530
        Self {
88
530
            inner: MatchingEngineActionStateResult::new(
89
530
                sub,
90
530
                simple_scheduler_state_manager,
91
530
                no_event_action_timeout,
92
530
                now_fn,
93
530
            ),
94
530
        }
95
530
    }
96
}
97
98
#[async_trait]
99
impl<U, T, I, NowFn> ActionStateResult for ClientActionStateResult<U, T, I, NowFn>
100
where
101
    U: AwaitedActionSubscriber,
102
    T: AwaitedActionDb,
103
    I: InstantWrapper,
104
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
105
{
106
3
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
107
3
        self.inner.as_state().await
108
6
    }
109
110
48
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
111
48
        self.inner.changed().await
112
92
    }
113
114
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
115
0
        self.inner.as_action_info().await
116
0
    }
117
}
118
119
struct MatchingEngineActionStateResult<U, T, I, NowFn>
120
where
121
    U: AwaitedActionSubscriber,
122
    T: AwaitedActionDb,
123
    I: InstantWrapper,
124
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
125
{
126
    awaited_action_sub: U,
127
    simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
128
    no_event_action_timeout: Duration,
129
    now_fn: NowFn,
130
}
131
impl<U, T, I, NowFn> MatchingEngineActionStateResult<U, T, I, NowFn>
132
where
133
    U: AwaitedActionSubscriber,
134
    T: AwaitedActionDb,
135
    I: InstantWrapper,
136
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
137
{
138
575
    const fn new(
139
575
        awaited_action_sub: U,
140
575
        simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
141
575
        no_event_action_timeout: Duration,
142
575
        now_fn: NowFn,
143
575
    ) -> Self {
144
575
        Self {
145
575
            awaited_action_sub,
146
575
            simple_scheduler_state_manager,
147
575
            no_event_action_timeout,
148
575
            now_fn,
149
575
        }
150
575
    }
151
}
152
153
#[async_trait]
154
impl<U, T, I, NowFn> ActionStateResult for MatchingEngineActionStateResult<U, T, I, NowFn>
155
where
156
    U: AwaitedActionSubscriber,
157
    T: AwaitedActionDb,
158
    I: InstantWrapper,
159
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
160
{
161
31
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
162
31
        let awaited_action = self
163
31
            .awaited_action_sub
164
31
            .borrow()
165
31
            .await
166
31
            .err_tip(|| 
"In MatchingEngineActionStateResult::as_state"0
)
?0
;
167
31
        Ok((
168
31
            awaited_action.state().clone(),
169
31
            awaited_action.maybe_origin_metadata().cloned(),
170
31
        ))
171
62
    }
172
173
48
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
174
48
        let mut timeout_attempts = 0;
175
        loop {
176
58
            tokio::select! {
177
58
                
awaited_action_result44
= self.awaited_action_sub.changed() => {
178
44
                    return awaited_action_result
179
44
                        .err_tip(|| 
"In MatchingEngineActionStateResult::changed"0
)
180
44
                        .map(|v| (v.state().clone(), v.maybe_origin_metadata().cloned()));
181
                }
182
58
                () = (self.now_fn)().sleep(self.no_event_action_timeout) => {
183
10
                    // Timeout happened, do additional checks below.
184
10
                }
185
            }
186
187
10
            let awaited_action = self
188
10
                .awaited_action_sub
189
10
                .borrow()
190
10
                .await
191
10
                .err_tip(|| 
"In MatchingEngineActionStateResult::changed"0
)
?0
;
192
193
10
            if 
matches!1
(awaited_action.state().stage, ActionStage::Queued) {
194
                // Actions in queued state do not get periodically updated,
195
                // so we don't need to timeout them.
196
9
                continue;
197
1
            }
198
199
1
            let simple_scheduler_state_manager = self
200
1
                .simple_scheduler_state_manager
201
1
                .upgrade()
202
1
                .err_tip(|| 
format!("Failed to upgrade weak reference to SimpleSchedulerStateManager in MatchingEngineActionStateResult::changed at attempt: {timeout_attempts}")0
)
?0
;
203
204
1
            event!(
205
1
                Level::WARN,
206
                ?awaited_action,
207
1
                "OperationId {} / {} timed out after {} seconds issuing a retry",
208
1
                awaited_action.operation_id(),
209
1
                awaited_action.state().client_operation_id,
210
1
                self.no_event_action_timeout.as_secs_f32(),
211
            );
212
213
1
            simple_scheduler_state_manager
214
1
                .timeout_operation_id(awaited_action.operation_id())
215
1
                .await
216
1
                .err_tip(|| 
"In MatchingEngineActionStateResult::changed"0
)
?0
;
217
218
1
            if timeout_attempts >= MAX_UPDATE_RETRIES {
  Branch (218:16): [True: 0, False: 0]
  Branch (218:16): [True: 0, False: 0]
  Branch (218:16): [Folded - Ignored]
  Branch (218:16): [True: 0, False: 1]
  Branch (218:16): [True: 0, False: 0]
219
0
                return Err(make_err!(
220
0
                    Code::Internal,
221
0
                    "Failed to update action after {} retries with no error set in MatchingEngineActionStateResult::changed - {} {:?}",
222
0
                    MAX_UPDATE_RETRIES,
223
0
                    awaited_action.operation_id(),
224
0
                    awaited_action.state().stage,
225
0
                ));
226
1
            }
227
1
            timeout_attempts += 1;
228
        }
229
92
    }
230
231
45
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
232
45
        let awaited_action = self
233
45
            .awaited_action_sub
234
45
            .borrow()
235
45
            .await
236
45
            .err_tip(|| 
"In MatchingEngineActionStateResult::as_action_info"0
)
?0
;
237
45
        Ok((
238
45
            awaited_action.action_info().clone(),
239
45
            awaited_action.maybe_origin_metadata().cloned(),
240
45
        ))
241
90
    }
242
}
243
244
/// `SimpleSchedulerStateManager` is responsible for maintaining the state of the scheduler.
245
/// Scheduler state includes the actions that are queued, active, and recently completed.
246
/// It also includes the workers that are available to execute actions based on allocation
247
/// strategy.
248
#[derive(MetricsComponent)]
249
pub struct SimpleSchedulerStateManager<T, I, NowFn>
250
where
251
    T: AwaitedActionDb,
252
    I: InstantWrapper,
253
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
254
{
255
    /// Database for storing the state of all actions.
256
    #[metric(group = "action_db")]
257
    action_db: T,
258
259
    /// Maximum number of times a job can be retried.
260
    // TODO(allada) This should be a scheduler decorator instead
261
    // of always having it on every SimpleScheduler.
262
    #[metric(help = "Maximum number of times a job can be retried")]
263
    max_job_retries: usize,
264
265
    /// Duration after which an action is considered to be timed out if
266
    /// no event is received.
267
    #[metric(
268
        help = "Duration after which an action is considered to be timed out if no event is received"
269
    )]
270
    no_event_action_timeout: Duration,
271
272
    /// Mark operation as timed out if the worker has not updated in this duration.
273
    /// This is used to prevent operations from being stuck in the queue forever
274
    /// if it is not being processed by any worker.
275
    client_action_timeout: Duration,
276
277
    // A lock to ensure only one timeout operation is running at a time
278
    // on this service.
279
    timeout_operation_mux: Mutex<()>,
280
281
    /// Weak reference to self.
282
    // We use a weak reference to reduce the risk of a memory leak from
283
    // future changes. If this becomes some kind of perforamnce issue,
284
    // we can consider using a strong reference.
285
    weak_self: Weak<Self>,
286
287
    /// Function to get the current time.
288
    now_fn: NowFn,
289
}
290
291
impl<T, I, NowFn> SimpleSchedulerStateManager<T, I, NowFn>
292
where
293
    T: AwaitedActionDb,
294
    I: InstantWrapper,
295
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
296
{
297
22
    pub fn new(
298
22
        max_job_retries: usize,
299
22
        no_event_action_timeout: Duration,
300
22
        client_action_timeout: Duration,
301
22
        action_db: T,
302
22
        now_fn: NowFn,
303
22
    ) -> Arc<Self> {
304
22
        Arc::new_cyclic(|weak_self| Self {
305
22
            action_db,
306
22
            max_job_retries,
307
22
            no_event_action_timeout,
308
22
            client_action_timeout,
309
22
            timeout_operation_mux: Mutex::new(()),
310
22
            weak_self: weak_self.clone(),
311
22
            now_fn,
312
22
        })
313
22
    }
314
315
548
    async fn apply_filter_predicate(
316
548
        &self,
317
548
        awaited_action: &AwaitedAction,
318
548
        filter: &OperationFilter,
319
548
    ) -> bool {
320
548
        // Note: The caller must filter `client_operation_id`.
321
548
322
548
        if awaited_action.last_client_keepalive_timestamp() + self.client_action_timeout
  Branch (322:12): [True: 0, False: 0]
  Branch (322:12): [True: 0, False: 0]
  Branch (322:12): [Folded - Ignored]
  Branch (322:12): [True: 0, False: 546]
  Branch (322:12): [True: 0, False: 2]
323
548
            < (self.now_fn)().now()
324
        {
325
0
            if !awaited_action.state().stage.is_finished() {
  Branch (325:16): [True: 0, False: 0]
  Branch (325:16): [True: 0, False: 0]
  Branch (325:16): [Folded - Ignored]
  Branch (325:16): [True: 0, False: 0]
  Branch (325:16): [True: 0, False: 0]
326
0
                let mut state = awaited_action.state().as_ref().clone();
327
0
                state.stage = ActionStage::Completed(ActionResult {
328
0
                    error: Some(make_err!(
329
0
                        Code::DeadlineExceeded,
330
0
                        "Operation timed out {} seconds of having no more clients listening",
331
0
                        self.client_action_timeout.as_secs_f32(),
332
0
                    )),
333
0
                    ..ActionResult::default()
334
0
                });
335
0
                let mut new_awaited_action = awaited_action.clone();
336
0
                new_awaited_action.worker_set_state(Arc::new(state), (self.now_fn)().now());
337
0
                if let Err(err) = self
  Branch (337:24): [True: 0, False: 0]
  Branch (337:24): [True: 0, False: 0]
  Branch (337:24): [Folded - Ignored]
  Branch (337:24): [True: 0, False: 0]
  Branch (337:24): [True: 0, False: 0]
338
0
                    .action_db
339
0
                    .update_awaited_action(new_awaited_action)
340
0
                    .await
341
                {
342
0
                    event!(
343
0
                        Level::WARN,
344
0
                        "Failed to update action to timed out state after client keepalive timeout. This is ok if multiple schedulers tried to set the state at the same time: {err}",
345
                    );
346
0
                }
347
0
            }
348
0
            return false;
349
548
        }
350
351
548
        if let Some(
operation_id0
) = &filter.operation_id {
  Branch (351:16): [True: 0, False: 0]
  Branch (351:16): [True: 0, False: 0]
  Branch (351:16): [Folded - Ignored]
  Branch (351:16): [True: 0, False: 546]
  Branch (351:16): [True: 0, False: 2]
352
0
            if operation_id != awaited_action.operation_id() {
  Branch (352:16): [True: 0, False: 0]
  Branch (352:16): [True: 0, False: 0]
  Branch (352:16): [Folded - Ignored]
  Branch (352:16): [True: 0, False: 0]
  Branch (352:16): [True: 0, False: 0]
353
0
                return false;
354
0
            }
355
548
        }
356
357
548
        if filter.worker_id.is_some() && 
filter.worker_id.as_ref() != awaited_action.worker_id()0
{
  Branch (357:12): [True: 0, False: 0]
  Branch (357:42): [True: 0, False: 0]
  Branch (357:12): [True: 0, False: 0]
  Branch (357:42): [True: 0, False: 0]
  Branch (357:12): [Folded - Ignored]
  Branch (357:42): [Folded - Ignored]
  Branch (357:12): [True: 0, False: 546]
  Branch (357:42): [True: 0, False: 0]
  Branch (357:12): [True: 0, False: 2]
  Branch (357:42): [True: 0, False: 0]
358
0
            return false;
359
548
        }
360
361
        {
362
548
            if let Some(
filter_unique_key0
) = &filter.unique_key {
  Branch (362:20): [True: 0, False: 0]
  Branch (362:20): [True: 0, False: 0]
  Branch (362:20): [Folded - Ignored]
  Branch (362:20): [True: 0, False: 546]
  Branch (362:20): [True: 0, False: 2]
363
0
                match &awaited_action.action_info().unique_qualifier {
364
0
                    ActionUniqueQualifier::Cachable(unique_key) => {
365
0
                        if filter_unique_key != unique_key {
  Branch (365:28): [True: 0, False: 0]
  Branch (365:28): [True: 0, False: 0]
  Branch (365:28): [Folded - Ignored]
  Branch (365:28): [True: 0, False: 0]
  Branch (365:28): [True: 0, False: 0]
366
0
                            return false;
367
0
                        }
368
                    }
369
                    ActionUniqueQualifier::Uncachable(_) => {
370
0
                        return false;
371
                    }
372
                }
373
548
            }
374
548
            if let Some(
action_digest0
) = filter.action_digest {
  Branch (374:20): [True: 0, False: 0]
  Branch (374:20): [True: 0, False: 0]
  Branch (374:20): [Folded - Ignored]
  Branch (374:20): [True: 0, False: 546]
  Branch (374:20): [True: 0, False: 2]
375
0
                if action_digest != awaited_action.action_info().digest() {
  Branch (375:20): [True: 0, False: 0]
  Branch (375:20): [True: 0, False: 0]
  Branch (375:20): [Folded - Ignored]
  Branch (375:20): [True: 0, False: 0]
  Branch (375:20): [True: 0, False: 0]
376
0
                    return false;
377
0
                }
378
548
            }
379
        }
380
381
        {
382
548
            let last_worker_update_timestamp = awaited_action.last_worker_updated_timestamp();
383
548
            if let Some(
worker_update_before0
) = filter.worker_update_before {
  Branch (383:20): [True: 0, False: 0]
  Branch (383:20): [True: 0, False: 0]
  Branch (383:20): [Folded - Ignored]
  Branch (383:20): [True: 0, False: 546]
  Branch (383:20): [True: 0, False: 2]
384
0
                if worker_update_before < last_worker_update_timestamp {
  Branch (384:20): [True: 0, False: 0]
  Branch (384:20): [True: 0, False: 0]
  Branch (384:20): [Folded - Ignored]
  Branch (384:20): [True: 0, False: 0]
  Branch (384:20): [True: 0, False: 0]
385
0
                    return false;
386
0
                }
387
548
            }
388
548
            if let Some(
completed_before0
) = filter.completed_before {
  Branch (388:20): [True: 0, False: 0]
  Branch (388:20): [True: 0, False: 0]
  Branch (388:20): [Folded - Ignored]
  Branch (388:20): [True: 0, False: 546]
  Branch (388:20): [True: 0, False: 2]
389
0
                if awaited_action.state().stage.is_finished()
  Branch (389:20): [True: 0, False: 0]
  Branch (389:20): [True: 0, False: 0]
  Branch (389:20): [Folded - Ignored]
  Branch (389:20): [True: 0, False: 0]
  Branch (389:20): [True: 0, False: 0]
390
0
                    && completed_before < last_worker_update_timestamp
  Branch (390:24): [True: 0, False: 0]
  Branch (390:24): [True: 0, False: 0]
  Branch (390:24): [Folded - Ignored]
  Branch (390:24): [True: 0, False: 0]
  Branch (390:24): [True: 0, False: 0]
391
                {
392
0
                    return false;
393
0
                }
394
548
            }
395
548
            if filter.stages != OperationStageFlags::Any {
  Branch (395:16): [True: 0, False: 0]
  Branch (395:16): [True: 0, False: 0]
  Branch (395:16): [Folded - Ignored]
  Branch (395:16): [True: 543, False: 3]
  Branch (395:16): [True: 2, False: 0]
396
545
                let stage_flag = match awaited_action.state().stage {
397
0
                    ActionStage::Unknown => OperationStageFlags::Any,
398
0
                    ActionStage::CacheCheck => OperationStageFlags::CacheCheck,
399
545
                    ActionStage::Queued => OperationStageFlags::Queued,
400
0
                    ActionStage::Executing => OperationStageFlags::Executing,
401
                    ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => {
402
0
                        OperationStageFlags::Completed
403
                    }
404
                };
405
545
                if !filter.stages.intersects(stage_flag) {
  Branch (405:20): [True: 0, False: 0]
  Branch (405:20): [True: 0, False: 0]
  Branch (405:20): [Folded - Ignored]
  Branch (405:20): [True: 0, False: 543]
  Branch (405:20): [True: 0, False: 2]
406
0
                    return false;
407
545
                }
408
3
            }
409
        }
410
411
548
        true
412
548
    }
413
414
    /// Let the scheduler know that an operation has timed out from
415
    /// the client side (ie: worker has not updated in a while).
416
1
    async fn timeout_operation_id(&self, operation_id: &OperationId) -> Result<(), Error> {
417
        // Ensure that only one timeout operation is running at a time.
418
        // Failing to do this could result in the same operation being
419
        // timed out multiple times at the same time.
420
        // Note: We could implement this on a per-operation_id basis, but it is quite
421
        // complex to manage the locks.
422
1
        let _lock = self.timeout_operation_mux.lock().await;
423
424
1
        let awaited_action_subscriber = self
425
1
            .action_db
426
1
            .get_by_operation_id(operation_id)
427
1
            .await
428
1
            .err_tip(|| 
"In SimpleSchedulerStateManager::timeout_operation_id"0
)
?0
429
1
            .err_tip(|| {
430
0
                format!("Operation id {operation_id} does not exist in SimpleSchedulerStateManager::timeout_operation_id")
431
1
            })
?0
;
432
433
1
        let awaited_action = awaited_action_subscriber
434
1
            .borrow()
435
1
            .await
436
1
            .err_tip(|| 
"In SimpleSchedulerStateManager::timeout_operation_id"0
)
?0
;
437
438
        // If the action is not executing, we should not timeout the action.
439
1
        if !
matches!0
(awaited_action.state().stage, ActionStage::Executing) {
  Branch (439:12): [True: 0, False: 0]
  Branch (439:12): [True: 0, False: 0]
  Branch (439:12): [Folded - Ignored]
  Branch (439:12): [True: 0, False: 1]
  Branch (439:12): [True: 0, False: 0]
440
0
            return Ok(());
441
1
        }
442
443
1
        let last_worker_updated = awaited_action
444
1
            .last_worker_updated_timestamp()
445
1
            .duration_since(SystemTime::UNIX_EPOCH)
446
1
            .map_err(|e| {
447
0
                make_err!(
448
0
                    Code::Internal,
449
0
                    "Failed to convert last_worker_updated to duration since epoch {e:?}"
450
0
                )
451
1
            })
?0
;
452
1
        let worker_should_update_before = last_worker_updated
453
1
            .checked_add(self.no_event_action_timeout)
454
1
            .err_tip(|| 
"Timestamp too big in SimpleSchedulerStateManager::timeout_operation_id"0
)
?0
;
455
1
        if worker_should_update_before < (self.now_fn)().elapsed() {
  Branch (455:12): [True: 0, False: 0]
  Branch (455:12): [True: 0, False: 0]
  Branch (455:12): [Folded - Ignored]
  Branch (455:12): [True: 0, False: 1]
  Branch (455:12): [True: 0, False: 0]
456
            // The action was updated recently, we should not timeout the action.
457
            // This is to prevent timing out actions that have recently been updated
458
            // (like multiple clients timeout the same action at the same time).
459
0
            return Ok(());
460
1
        }
461
1
462
1
        self.assign_operation(
463
1
            operation_id,
464
1
            Err(make_err!(
465
1
                Code::DeadlineExceeded,
466
1
                "Operation timed out after {} seconds",
467
1
                self.no_event_action_timeout.as_secs_f32(),
468
1
            )),
469
1
        )
470
1
        .await
471
1
    }
472
473
41
    async fn inner_update_operation(
474
41
        &self,
475
41
        operation_id: &OperationId,
476
41
        maybe_worker_id: Option<&WorkerId>,
477
41
        update: UpdateOperationType,
478
41
    ) -> Result<(), Error> {
479
41
        let mut last_err = None;
480
42
        for _ in 0..MAX_UPDATE_RETRIES {
481
42
            let maybe_awaited_action_subscriber = self
482
42
                .action_db
483
42
                .get_by_operation_id(operation_id)
484
42
                .await
485
42
                .err_tip(|| 
"In SimpleSchedulerStateManager::update_operation"0
)
?0
;
486
42
            let Some(
awaited_action_subscriber41
) = maybe_awaited_action_subscriber else {
  Branch (486:17): [True: 0, False: 0]
  Branch (486:17): [True: 0, False: 0]
  Branch (486:17): [Folded - Ignored]
  Branch (486:17): [True: 39, False: 0]
  Branch (486:17): [True: 2, False: 1]
487
                // No action found. It is ok if the action was not found. It
488
                // probably means that the action was dropped, but worker was
489
                // still processing it.
490
1
                return Ok(());
491
            };
492
493
41
            let mut awaited_action = awaited_action_subscriber
494
41
                .borrow()
495
41
                .await
496
41
                .err_tip(|| 
"In SimpleSchedulerStateManager::update_operation"0
)
?0
;
497
498
            // Make sure the worker id matches the awaited action worker id.
499
            // This might happen if the worker sending the update is not the
500
            // worker that was assigned.
501
41
            if awaited_action.worker_id().is_some()
  Branch (501:16): [True: 0, False: 0]
  Branch (501:16): [True: 0, False: 0]
  Branch (501:16): [Folded - Ignored]
  Branch (501:16): [True: 13, False: 26]
  Branch (501:16): [True: 0, False: 2]
502
13
                && maybe_worker_id.is_some()
  Branch (502:20): [True: 0, False: 0]
  Branch (502:20): [True: 0, False: 0]
  Branch (502:20): [Folded - Ignored]
  Branch (502:20): [True: 12, False: 1]
  Branch (502:20): [True: 0, False: 0]
503
12
                && maybe_worker_id != awaited_action.worker_id()
  Branch (503:20): [True: 0, False: 0]
  Branch (503:20): [True: 0, False: 0]
  Branch (503:20): [Folded - Ignored]
  Branch (503:20): [True: 0, False: 12]
  Branch (503:20): [True: 0, False: 0]
504
            {
505
                // If another worker is already assigned to the action, another
506
                // worker probably picked up the action. We should not update the
507
                // action in this case and abort this operation.
508
0
                let err = make_err!(
509
0
                    Code::Aborted,
510
0
                    "Worker ids do not match - {:?} != {:?} for {:?}",
511
0
                    maybe_worker_id,
512
0
                    awaited_action.worker_id(),
513
0
                    awaited_action,
514
0
                );
515
0
                event!(
516
0
                    Level::INFO,
517
0
                    "Worker ids do not match - {:?} != {:?} for {:?}. This is probably due to another worker picking up the action.",
518
0
                    maybe_worker_id,
519
0
                    awaited_action.worker_id(),
520
                    awaited_action,
521
                );
522
0
                return Err(err);
523
41
            }
524
41
525
41
            // Make sure we don't update an action that is already completed.
526
41
            if awaited_action.state().stage.is_finished() {
  Branch (526:16): [True: 0, False: 0]
  Branch (526:16): [True: 0, False: 0]
  Branch (526:16): [Folded - Ignored]
  Branch (526:16): [True: 0, False: 39]
  Branch (526:16): [True: 0, False: 2]
527
0
                return Err(make_err!(
528
0
                    Code::Internal,
529
0
                    "Action {operation_id:?} is already completed with state {:?} - maybe_worker_id: {:?}",
530
0
                    awaited_action.state().stage,
531
0
                    maybe_worker_id,
532
0
                ));
533
41
            }
534
535
41
            let stage = match &update {
536
                UpdateOperationType::KeepAlive => {
537
0
                    awaited_action.worker_keep_alive((self.now_fn)().now());
538
0
                    return self
539
0
                        .action_db
540
0
                        .update_awaited_action(awaited_action)
541
0
                        .await
542
0
                        .err_tip(|| "Failed to send KeepAlive in SimpleSchedulerStateManager::update_operation");
543
                }
544
33
                UpdateOperationType::UpdateWithActionStage(stage) => stage.clone(),
545
8
                UpdateOperationType::UpdateWithError(err) => {
546
8
                    // Don't count a backpressure failure as an attempt for an action.
547
8
                    let due_to_backpressure = err.code == Code::ResourceExhausted;
548
8
                    if !due_to_backpressure {
  Branch (548:24): [True: 0, False: 0]
  Branch (548:24): [True: 0, False: 0]
  Branch (548:24): [Folded - Ignored]
  Branch (548:24): [True: 8, False: 0]
  Branch (548:24): [True: 0, False: 0]
549
8
                        awaited_action.attempts += 1;
550
8
                    
}0
551
552
8
                    if awaited_action.attempts > self.max_job_retries {
  Branch (552:24): [True: 0, False: 0]
  Branch (552:24): [True: 0, False: 0]
  Branch (552:24): [Folded - Ignored]
  Branch (552:24): [True: 1, False: 7]
  Branch (552:24): [True: 0, False: 0]
553
1
                        ActionStage::Completed(ActionResult {
554
1
                            execution_metadata: ExecutionMetadata {
555
1
                                worker: maybe_worker_id.map_or_else(String::default, ToString::to_string),
556
1
                                ..ExecutionMetadata::default()
557
1
                            },
558
1
                            error: Some(err.clone().merge(make_err!(
559
1
                                Code::Internal,
560
1
                                "Job cancelled because it attempted to execute too many times {} > {} times {}",
561
1
                                awaited_action.attempts,
562
1
                                self.max_job_retries,
563
1
                                format!("for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}"),
564
1
                            ))),
565
1
                            ..ActionResult::default()
566
1
                        })
567
                    } else {
568
7
                        ActionStage::Queued
569
                    }
570
                }
571
            };
572
41
            let now = (self.now_fn)().now();
573
41
            if 
matches!34
(stage, ActionStage::Queued) {
574
7
                // If the action is queued, we need to unset the worker id regardless of
575
7
                // which worker sent the update.
576
7
                awaited_action.set_worker_id(None, now);
577
34
            } else {
578
34
                awaited_action.set_worker_id(maybe_worker_id.cloned(), now);
579
34
            }
580
41
            awaited_action.worker_set_state(
581
41
                Arc::new(ActionState {
582
41
                    stage,
583
41
                    // Client id is not known here, it is the responsibility of
584
41
                    // the the subscriber impl to replace this with the
585
41
                    // correct client id.
586
41
                    client_operation_id: operation_id.clone(),
587
41
                    action_digest: awaited_action.action_info().digest(),
588
41
                }),
589
41
                now,
590
41
            );
591
592
41
            let update_action_result = self
593
41
                .action_db
594
41
                .update_awaited_action(awaited_action)
595
41
                .await
596
41
                .err_tip(|| 
"In SimpleSchedulerStateManager::update_operation"2
);
597
41
            if let Err(
err2
) = update_action_result {
  Branch (597:20): [True: 0, False: 0]
  Branch (597:20): [True: 0, False: 0]
  Branch (597:20): [Folded - Ignored]
  Branch (597:20): [True: 0, False: 39]
  Branch (597:20): [True: 2, False: 0]
598
                // We use Aborted to signal that the action was not
599
                // updated due to the data being set was not the latest
600
                // but can be retried.
601
2
                if err.code == Code::Aborted {
  Branch (601:20): [True: 0, False: 0]
  Branch (601:20): [True: 0, False: 0]
  Branch (601:20): [Folded - Ignored]
  Branch (601:20): [True: 0, False: 0]
  Branch (601:20): [True: 1, False: 1]
602
1
                    last_err = Some(err);
603
1
                    continue;
604
1
                }
605
1
                return Err(err);
606
39
            }
607
39
            return Ok(());
608
        }
609
0
        match last_err {
610
0
            Some(err) => Err(err),
611
0
            None => Err(make_err!(
612
0
                Code::Internal,
613
0
                "Failed to update action after {} retries with no error set",
614
0
                MAX_UPDATE_RETRIES,
615
0
            )),
616
        }
617
41
    }
618
619
27
    async fn inner_add_operation(
620
27
        &self,
621
27
        new_client_operation_id: OperationId,
622
27
        action_info: Arc<ActionInfo>,
623
27
    ) -> Result<T::Subscriber, Error> {
624
27
        self.action_db
625
27
            .add_action(new_client_operation_id, action_info)
626
27
            .await
627
27
            .err_tip(|| 
"In SimpleSchedulerStateManager::add_operation"0
)
628
27
    }
629
630
597
    async fn inner_filter_operations<'a, F>(
631
597
        &'a self,
632
597
        filter: OperationFilter,
633
597
        to_action_state_result: F,
634
597
    ) -> Result<ActionStateResultStream<'a>, Error>
635
597
    where
636
597
        F: Fn(T::Subscriber) -> Box<dyn ActionStateResult> + Send + Sync + 'a,
637
597
    {
638
594
        const fn sorted_awaited_action_state_for_flags(
639
594
            stage: OperationStageFlags,
640
594
        ) -> Option<SortedAwaitedActionState> {
641
594
            match stage {
642
0
                OperationStageFlags::CacheCheck => Some(SortedAwaitedActionState::CacheCheck),
643
594
                OperationStageFlags::Queued => Some(SortedAwaitedActionState::Queued),
644
0
                OperationStageFlags::Executing => Some(SortedAwaitedActionState::Executing),
645
0
                OperationStageFlags::Completed => Some(SortedAwaitedActionState::Completed),
646
0
                _ => None,
647
            }
648
594
        }
649
650
597
        if let Some(
operation_id0
) = &filter.operation_id {
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [Folded - Ignored]
  Branch (650:16): [True: 0, False: 503]
  Branch (650:16): [True: 0, False: 90]
  Branch (650:16): [True: 0, False: 0]
  Branch (650:16): [True: 0, False: 4]
651
0
            let maybe_subscriber = self
652
0
                .action_db
653
0
                .get_by_operation_id(operation_id)
654
0
                .await
655
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
656
0
            let Some(subscriber) = maybe_subscriber else {
  Branch (656:17): [True: 0, False: 0]
  Branch (656:17): [True: 0, False: 0]
  Branch (656:17): [True: 0, False: 0]
  Branch (656:17): [True: 0, False: 0]
  Branch (656:17): [Folded - Ignored]
  Branch (656:17): [True: 0, False: 0]
  Branch (656:17): [True: 0, False: 0]
  Branch (656:17): [True: 0, False: 0]
  Branch (656:17): [True: 0, False: 0]
657
0
                return Ok(Box::pin(stream::empty()));
658
            };
659
0
            let awaited_action = subscriber
660
0
                .borrow()
661
0
                .await
662
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
663
0
            if !self.apply_filter_predicate(&awaited_action, &filter).await {
  Branch (663:16): [True: 0, False: 0]
  Branch (663:16): [True: 0, False: 0]
  Branch (663:16): [True: 0, False: 0]
  Branch (663:16): [True: 0, False: 0]
  Branch (663:16): [Folded - Ignored]
  Branch (663:16): [True: 0, False: 0]
  Branch (663:16): [True: 0, False: 0]
  Branch (663:16): [True: 0, False: 0]
  Branch (663:16): [True: 0, False: 0]
664
0
                return Ok(Box::pin(stream::empty()));
665
0
            }
666
0
            return Ok(Box::pin(stream::once(async move {
667
0
                to_action_state_result(subscriber)
668
0
            })));
669
597
        }
670
597
        if let Some(
client_operation_id3
) = &filter.client_operation_id {
  Branch (670:16): [True: 0, False: 0]
  Branch (670:16): [True: 0, False: 0]
  Branch (670:16): [True: 0, False: 0]
  Branch (670:16): [True: 0, False: 0]
  Branch (670:16): [Folded - Ignored]
  Branch (670:16): [True: 3, False: 500]
  Branch (670:16): [True: 0, False: 90]
  Branch (670:16): [True: 0, False: 0]
  Branch (670:16): [True: 0, False: 4]
671
3
            let maybe_subscriber = self
672
3
                .action_db
673
3
                .get_awaited_action_by_id(client_operation_id)
674
3
                .await
675
3
                .err_tip(|| 
"In SimpleSchedulerStateManager::filter_operations"0
)
?0
;
676
3
            let Some(subscriber) = maybe_subscriber else {
  Branch (676:17): [True: 0, False: 0]
  Branch (676:17): [True: 0, False: 0]
  Branch (676:17): [True: 0, False: 0]
  Branch (676:17): [True: 0, False: 0]
  Branch (676:17): [Folded - Ignored]
  Branch (676:17): [True: 3, False: 0]
  Branch (676:17): [True: 0, False: 0]
  Branch (676:17): [True: 0, False: 0]
  Branch (676:17): [True: 0, False: 0]
677
0
                return Ok(Box::pin(stream::empty()));
678
            };
679
3
            let awaited_action = subscriber
680
3
                .borrow()
681
3
                .await
682
3
                .err_tip(|| 
"In SimpleSchedulerStateManager::filter_operations"0
)
?0
;
683
3
            if !self.apply_filter_predicate(&awaited_action, &filter).await {
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [Folded - Ignored]
  Branch (683:16): [True: 0, False: 3]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
684
0
                return Ok(Box::pin(stream::empty()));
685
3
            }
686
3
            return Ok(Box::pin(stream::once(async move {
687
3
                to_action_state_result(subscriber)
688
3
            })));
689
594
        }
690
691
594
        let Some(sorted_awaited_action_state) =
  Branch (691:13): [True: 0, False: 0]
  Branch (691:13): [True: 0, False: 0]
  Branch (691:13): [True: 0, False: 0]
  Branch (691:13): [True: 0, False: 0]
  Branch (691:13): [Folded - Ignored]
  Branch (691:13): [True: 500, False: 0]
  Branch (691:13): [True: 90, False: 0]
  Branch (691:13): [True: 0, False: 0]
  Branch (691:13): [True: 4, False: 0]
692
594
            sorted_awaited_action_state_for_flags(filter.stages)
693
        else {
694
0
            let mut all_items: Vec<_> = self
695
0
                .action_db
696
0
                .get_all_awaited_actions()
697
0
                .await
698
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?
699
0
                .and_then(|awaited_action_subscriber| async move {
700
0
                    let awaited_action = awaited_action_subscriber
701
0
                        .borrow()
702
0
                        .await
703
0
                        .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
704
0
                    Ok((awaited_action_subscriber, awaited_action))
705
0
                })
706
0
                .try_filter_map(|(subscriber, awaited_action)| {
707
0
                    let filter = filter.clone();
708
0
                    async move {
709
0
                        if self.apply_filter_predicate(&awaited_action, &filter).await {
  Branch (709:28): [True: 0, False: 0]
  Branch (709:28): [True: 0, False: 0]
  Branch (709:28): [True: 0, False: 0]
  Branch (709:28): [True: 0, False: 0]
  Branch (709:28): [Folded - Ignored]
  Branch (709:28): [True: 0, False: 0]
  Branch (709:28): [True: 0, False: 0]
  Branch (709:28): [True: 0, False: 0]
  Branch (709:28): [True: 0, False: 0]
710
0
                            Ok(Some((subscriber, awaited_action.sort_key())))
711
                        } else {
712
0
                            Ok(None)
713
                        }
714
0
                    }
715
0
                })
716
0
                .try_collect()
717
0
                .await
718
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
719
0
            match filter.order_by_priority_direction {
720
0
                Some(OrderDirection::Asc) => all_items.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)),
721
0
                Some(OrderDirection::Desc) => all_items.sort_unstable_by(|(_, a), (_, b)| b.cmp(a)),
722
0
                None => {}
723
            }
724
0
            return Ok(Box::pin(stream::iter(
725
0
                all_items
726
0
                    .into_iter()
727
0
                    .map(move |(subscriber, _)| to_action_state_result(subscriber)),
728
0
            )));
729
        };
730
731
594
        let desc = 
matches!500
(
732
94
            filter.order_by_priority_direction,
733
            Some(OrderDirection::Desc)
734
        );
735
594
        let stream = self
736
594
            .action_db
737
594
            .get_range_of_actions(
738
594
                sorted_awaited_action_state,
739
594
                Bound::Unbounded,
740
594
                Bound::Unbounded,
741
594
                desc,
742
594
            )
743
594
            .await
744
594
            .err_tip(|| 
"In SimpleSchedulerStateManager::filter_operations"0
)
?0
745
594
            .and_then(|awaited_action_subscriber| async move 
{545
746
545
                let awaited_action = awaited_action_subscriber
747
545
                    .borrow()
748
545
                    .await
749
545
                    .err_tip(|| 
"In SimpleSchedulerStateManager::filter_operations"0
)
?0
;
750
545
                Ok((awaited_action_subscriber, awaited_action))
751
1.09k
            })
752
594
            .try_filter_map(move |(subscriber, awaited_action)| {
753
545
                let filter = filter.clone();
754
545
                async move {
755
545
                    if self.apply_filter_predicate(&awaited_action, &filter).await {
  Branch (755:24): [True: 0, False: 0]
  Branch (755:24): [True: 0, False: 0]
  Branch (755:24): [True: 0, False: 0]
  Branch (755:24): [True: 0, False: 0]
  Branch (755:24): [Folded - Ignored]
  Branch (755:24): [True: 500, False: 0]
  Branch (755:24): [True: 43, False: 0]
  Branch (755:24): [True: 0, False: 0]
  Branch (755:24): [True: 2, False: 0]
756
545
                        Ok(Some(subscriber))
757
                    } else {
758
0
                        Ok(None)
759
                    }
760
545
                }
761
594
            
}545
)
762
594
            .map(move |result| -> Box<dyn ActionStateResult> {
763
545
                result.map_or_else(
764
545
                    |e| -> Box<dyn ActionStateResult> 
{ Box::new(ErrorActionStateResult(e)) }0
,
765
545
                    |v| -> Box<dyn ActionStateResult> { to_action_state_result(v) },
766
545
                )
767
594
            });
768
594
        Ok(Box::pin(stream))
769
597
    }
770
}
771
772
#[async_trait]
773
impl<T, I, NowFn> ClientStateManager for SimpleSchedulerStateManager<T, I, NowFn>
774
where
775
    T: AwaitedActionDb,
776
    I: InstantWrapper,
777
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
778
{
779
    async fn add_action(
780
        &self,
781
        client_operation_id: OperationId,
782
        action_info: Arc<ActionInfo>,
783
27
    ) -> Result<Box<dyn ActionStateResult>, Error> {
784
27
        let sub = self
785
27
            .inner_add_operation(client_operation_id, action_info.clone())
786
27
            .await
?0
;
787
788
27
        Ok(Box::new(ClientActionStateResult::new(
789
27
            sub,
790
27
            self.weak_self.clone(),
791
27
            self.no_event_action_timeout,
792
27
            self.now_fn.clone(),
793
27
        )))
794
54
    }
795
796
    async fn filter_operations<'a>(
797
        &'a self,
798
        filter: OperationFilter,
799
503
    ) -> Result<ActionStateResultStream<'a>, Error> {
800
503
        self.inner_filter_operations(filter, move |rx| {
801
503
            Box::new(ClientActionStateResult::new(
802
503
                rx,
803
503
                self.weak_self.clone(),
804
503
                self.no_event_action_timeout,
805
503
                self.now_fn.clone(),
806
503
            ))
807
503
        })
808
503
        .await
809
1.00k
    }
810
811
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
812
0
        None
813
0
    }
814
}
815
816
#[async_trait]
817
impl<T, I, NowFn> WorkerStateManager for SimpleSchedulerStateManager<T, I, NowFn>
818
where
819
    T: AwaitedActionDb,
820
    I: InstantWrapper,
821
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
822
{
823
    async fn update_operation(
824
        &self,
825
        operation_id: &OperationId,
826
        worker_id: &WorkerId,
827
        update: UpdateOperationType,
828
12
    ) -> Result<(), Error> {
829
12
        self.inner_update_operation(operation_id, Some(worker_id), update)
830
12
            .await
831
24
    }
832
}
833
834
#[async_trait]
835
impl<T, I, NowFn> MatchingEngineStateManager for SimpleSchedulerStateManager<T, I, NowFn>
836
where
837
    T: AwaitedActionDb,
838
    I: InstantWrapper,
839
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
840
{
841
    async fn filter_operations<'a>(
842
        &'a self,
843
        filter: OperationFilter,
844
94
    ) -> Result<ActionStateResultStream<'a>, Error> {
845
94
        self.inner_filter_operations(filter, |rx| {
846
45
            Box::new(MatchingEngineActionStateResult::new(
847
45
                rx,
848
45
                self.weak_self.clone(),
849
45
                self.no_event_action_timeout,
850
45
                self.now_fn.clone(),
851
45
            ))
852
94
        })
853
94
        .await
854
188
    }
855
856
    async fn assign_operation(
857
        &self,
858
        operation_id: &OperationId,
859
        worker_id_or_reason_for_unassign: Result<&WorkerId, Error>,
860
29
    ) -> Result<(), Error> {
861
29
        let (maybe_worker_id, update) = match worker_id_or_reason_for_unassign {
862
28
            Ok(worker_id) => (
863
28
                Some(worker_id),
864
28
                UpdateOperationType::UpdateWithActionStage(ActionStage::Executing),
865
28
            ),
866
1
            Err(err) => (None, UpdateOperationType::UpdateWithError(err)),
867
        };
868
29
        self.inner_update_operation(operation_id, maybe_worker_id, update)
869
29
            .await
870
58
    }
871
}