Coverage Report

Created: 2025-10-18 07:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/simple_scheduler_state_manager.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::ops::Bound;
16
use core::time::Duration;
17
use std::string::ToString;
18
use std::sync::{Arc, Weak};
19
20
use async_lock::Mutex;
21
use async_trait::async_trait;
22
use futures::{StreamExt, TryStreamExt, stream};
23
use nativelink_error::{Code, Error, ResultExt, make_err};
24
use nativelink_metric::MetricsComponent;
25
use nativelink_util::action_messages::{
26
    ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueQualifier, ExecutionMetadata,
27
    OperationId, WorkerId,
28
};
29
use nativelink_util::instant_wrapper::InstantWrapper;
30
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
31
use nativelink_util::operation_state_manager::{
32
    ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
33
    OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType, WorkerStateManager,
34
};
35
use nativelink_util::origin_event::OriginMetadata;
36
use tracing::{info, warn};
37
38
use super::awaited_action_db::{
39
    AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, SortedAwaitedActionState,
40
};
41
42
/// Maximum number of times an update to the database
43
/// can fail before giving up.
44
const MAX_UPDATE_RETRIES: usize = 5;
45
46
/// Simple struct that implements the `ActionStateResult` trait and always returns an error.
47
struct ErrorActionStateResult(Error);
48
49
#[async_trait]
50
impl ActionStateResult for ErrorActionStateResult {
51
0
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
52
        Err(self.0.clone())
53
0
    }
54
55
0
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
56
        Err(self.0.clone())
57
0
    }
58
59
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
60
        Err(self.0.clone())
61
0
    }
62
}
63
64
struct ClientActionStateResult<U, T, I, NowFn>
65
where
66
    U: AwaitedActionSubscriber,
67
    T: AwaitedActionDb,
68
    I: InstantWrapper,
69
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
70
{
71
    inner: MatchingEngineActionStateResult<U, T, I, NowFn>,
72
}
73
74
impl<U, T, I, NowFn> ClientActionStateResult<U, T, I, NowFn>
75
where
76
    U: AwaitedActionSubscriber,
77
    T: AwaitedActionDb,
78
    I: InstantWrapper,
79
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
80
{
81
534
    const fn new(
82
534
        sub: U,
83
534
        simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
84
534
        no_event_action_timeout: Duration,
85
534
        now_fn: NowFn,
86
534
    ) -> Self {
87
534
        Self {
88
534
            inner: MatchingEngineActionStateResult::new(
89
534
                sub,
90
534
                simple_scheduler_state_manager,
91
534
                no_event_action_timeout,
92
534
                now_fn,
93
534
            ),
94
534
        }
95
534
    }
96
}
97
98
#[async_trait]
99
impl<U, T, I, NowFn> ActionStateResult for ClientActionStateResult<U, T, I, NowFn>
100
where
101
    U: AwaitedActionSubscriber,
102
    T: AwaitedActionDb,
103
    I: InstantWrapper,
104
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
105
{
106
6
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
107
        self.inner.as_state().await
108
6
    }
109
110
50
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
111
        self.inner.changed().await
112
50
    }
113
114
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
115
        self.inner.as_action_info().await
116
0
    }
117
}
118
119
struct MatchingEngineActionStateResult<U, T, I, NowFn>
120
where
121
    U: AwaitedActionSubscriber,
122
    T: AwaitedActionDb,
123
    I: InstantWrapper,
124
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
125
{
126
    awaited_action_sub: U,
127
    simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
128
    no_event_action_timeout: Duration,
129
    now_fn: NowFn,
130
}
131
impl<U, T, I, NowFn> MatchingEngineActionStateResult<U, T, I, NowFn>
132
where
133
    U: AwaitedActionSubscriber,
134
    T: AwaitedActionDb,
135
    I: InstantWrapper,
136
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
137
{
138
584
    const fn new(
139
584
        awaited_action_sub: U,
140
584
        simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
141
584
        no_event_action_timeout: Duration,
142
584
        now_fn: NowFn,
143
584
    ) -> Self {
144
584
        Self {
145
584
            awaited_action_sub,
146
584
            simple_scheduler_state_manager,
147
584
            no_event_action_timeout,
148
584
            now_fn,
149
584
        }
150
584
    }
151
}
152
153
#[async_trait]
154
impl<U, T, I, NowFn> ActionStateResult for MatchingEngineActionStateResult<U, T, I, NowFn>
155
where
156
    U: AwaitedActionSubscriber,
157
    T: AwaitedActionDb,
158
    I: InstantWrapper,
159
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
160
{
161
38
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
162
        let awaited_action = self
163
            .awaited_action_sub
164
            .borrow()
165
            .await
166
            .err_tip(|| "In MatchingEngineActionStateResult::as_state")?;
167
        Ok((
168
            awaited_action.state().clone(),
169
            awaited_action.maybe_origin_metadata().cloned(),
170
        ))
171
38
    }
172
173
50
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
174
        let mut timeout_attempts = 0;
175
        loop {
176
            tokio::select! {
177
                awaited_action_result = self.awaited_action_sub.changed() => {
178
                    return awaited_action_result
179
                        .err_tip(|| "In MatchingEngineActionStateResult::changed")
180
46
                        .map(|v| (v.state().clone(), v.maybe_origin_metadata().cloned()));
181
                }
182
                () = (self.now_fn)().sleep(self.no_event_action_timeout) => {
183
                    // Timeout happened, do additional checks below.
184
                }
185
            }
186
187
            let awaited_action = self
188
                .awaited_action_sub
189
                .borrow()
190
                .await
191
                .err_tip(|| "In MatchingEngineActionStateResult::changed")?;
192
193
            if matches!(awaited_action.state().stage, ActionStage::Queued) {
194
                // Actions in queued state do not get periodically updated,
195
                // so we don't need to timeout them.
196
                continue;
197
            }
198
199
            let simple_scheduler_state_manager = self
200
                .simple_scheduler_state_manager
201
                .upgrade()
202
0
                .err_tip(|| format!("Failed to upgrade weak reference to SimpleSchedulerStateManager in MatchingEngineActionStateResult::changed at attempt: {timeout_attempts}"))?;
203
204
            warn!(
205
                ?awaited_action,
206
                "OperationId {} / {} timed out after {} seconds issuing a retry",
207
                awaited_action.operation_id(),
208
                awaited_action.state().client_operation_id,
209
                self.no_event_action_timeout.as_secs_f32(),
210
            );
211
212
            simple_scheduler_state_manager
213
                .timeout_operation_id(awaited_action.operation_id())
214
                .await
215
                .err_tip(|| "In MatchingEngineActionStateResult::changed")?;
216
217
            if timeout_attempts >= MAX_UPDATE_RETRIES {
218
                return Err(make_err!(
219
                    Code::Internal,
220
                    "Failed to update action after {} retries with no error set in MatchingEngineActionStateResult::changed - {} {:?}",
221
                    MAX_UPDATE_RETRIES,
222
                    awaited_action.operation_id(),
223
                    awaited_action.state().stage,
224
                ));
225
            }
226
            timeout_attempts += 1;
227
        }
228
50
    }
229
230
50
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
231
        let awaited_action = self
232
            .awaited_action_sub
233
            .borrow()
234
            .await
235
            .err_tip(|| "In MatchingEngineActionStateResult::as_action_info")?;
236
        Ok((
237
            awaited_action.action_info().clone(),
238
            awaited_action.maybe_origin_metadata().cloned(),
239
        ))
240
50
    }
241
}
242
243
/// `SimpleSchedulerStateManager` is responsible for maintaining the state of the scheduler.
244
/// Scheduler state includes the actions that are queued, active, and recently completed.
245
/// It also includes the workers that are available to execute actions based on allocation
246
/// strategy.
247
#[derive(MetricsComponent)]
248
pub(crate) struct SimpleSchedulerStateManager<T, I, NowFn>
249
where
250
    T: AwaitedActionDb,
251
    I: InstantWrapper,
252
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
253
{
254
    /// Database for storing the state of all actions.
255
    #[metric(group = "action_db")]
256
    action_db: T,
257
258
    /// Maximum number of times a job can be retried.
259
    // TODO(palfrey) This should be a scheduler decorator instead
260
    // of always having it on every SimpleScheduler.
261
    #[metric(help = "Maximum number of times a job can be retried")]
262
    max_job_retries: usize,
263
264
    /// Duration after which an action is considered to be timed out if
265
    /// no event is received.
266
    #[metric(
267
        help = "Duration after which an action is considered to be timed out if no event is received"
268
    )]
269
    no_event_action_timeout: Duration,
270
271
    /// Mark operation as timed out if the worker has not updated in this duration.
272
    /// This is used to prevent operations from being stuck in the queue forever
273
    /// if it is not being processed by any worker.
274
    client_action_timeout: Duration,
275
276
    // A lock to ensure only one timeout operation is running at a time
277
    // on this service.
278
    timeout_operation_mux: Mutex<()>,
279
280
    /// Weak reference to self.
281
    // We use a weak reference to reduce the risk of a memory leak from
282
    // future changes. If this becomes some kind of performance issue,
283
    // we can consider using a strong reference.
284
    weak_self: Weak<Self>,
285
286
    /// Function to get the current time.
287
    now_fn: NowFn,
288
}
289
290
impl<T, I, NowFn> SimpleSchedulerStateManager<T, I, NowFn>
291
where
292
    T: AwaitedActionDb,
293
    I: InstantWrapper,
294
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
295
{
296
23
    pub(crate) fn new(
297
23
        max_job_retries: usize,
298
23
        no_event_action_timeout: Duration,
299
23
        client_action_timeout: Duration,
300
23
        action_db: T,
301
23
        now_fn: NowFn,
302
23
    ) -> Arc<Self> {
303
23
        Arc::new_cyclic(|weak_self| Self {
304
23
            action_db,
305
23
            max_job_retries,
306
23
            no_event_action_timeout,
307
23
            client_action_timeout,
308
23
            timeout_operation_mux: Mutex::new(()),
309
23
            weak_self: weak_self.clone(),
310
23
            now_fn,
311
23
        })
312
23
    }
313
314
554
    async fn apply_filter_predicate(
315
554
        &self,
316
554
        awaited_action: &AwaitedAction,
317
554
        subscriber: &T::Subscriber,
318
554
        filter: &OperationFilter,
319
554
    ) -> bool {
320
        // Note: The caller must filter `client_operation_id`.
321
322
554
        let mut maybe_reloaded_awaited_action: Option<AwaitedAction> = None;
323
554
        if awaited_action.last_client_keepalive_timestamp() + self.client_action_timeout
  Branch (323:12): [True: 0, False: 0]
  Branch (323:12): [True: 0, False: 0]
  Branch (323:12): [Folded - Ignored]
  Branch (323:12): [True: 0, False: 6]
  Branch (323:12): [True: 0, False: 546]
  Branch (323:12): [True: 0, False: 2]
324
554
            < (self.now_fn)().now()
325
        {
326
            // This may change if the version is out of date.
327
0
            let mut timed_out = true;
328
0
            if !awaited_action.state().stage.is_finished() {
  Branch (328:16): [True: 0, False: 0]
  Branch (328:16): [True: 0, False: 0]
  Branch (328:16): [Folded - Ignored]
  Branch (328:16): [True: 0, False: 0]
  Branch (328:16): [True: 0, False: 0]
  Branch (328:16): [True: 0, False: 0]
329
0
                let mut state = awaited_action.state().as_ref().clone();
330
0
                state.stage = ActionStage::Completed(ActionResult {
331
0
                    error: Some(make_err!(
332
0
                        Code::DeadlineExceeded,
333
0
                        "Operation timed out {} seconds of having no more clients listening",
334
0
                        self.client_action_timeout.as_secs_f32(),
335
0
                    )),
336
0
                    ..ActionResult::default()
337
0
                });
338
0
                let state = Arc::new(state);
339
                // We may be competing with an client timestamp update, so try
340
                // this a few times.
341
0
                for attempt in 1..=MAX_UPDATE_RETRIES {
342
0
                    let mut new_awaited_action = match &maybe_reloaded_awaited_action {
343
0
                        None => awaited_action.clone(),
344
0
                        Some(reloaded_awaited_action) => reloaded_awaited_action.clone(),
345
                    };
346
0
                    new_awaited_action.worker_set_state(state.clone(), (self.now_fn)().now());
347
0
                    let err = match self
348
0
                        .action_db
349
0
                        .update_awaited_action(new_awaited_action)
350
0
                        .await
351
                    {
352
0
                        Ok(()) => break,
353
0
                        Err(err) => err,
354
                    };
355
                    // Reload from the database if the action was outdated.
356
0
                    let maybe_awaited_action =
357
0
                        if attempt == MAX_UPDATE_RETRIES || err.code != Code::Aborted {
  Branch (357:28): [True: 0, False: 0]
  Branch (357:61): [True: 0, False: 0]
  Branch (357:28): [True: 0, False: 0]
  Branch (357:61): [True: 0, False: 0]
  Branch (357:28): [Folded - Ignored]
  Branch (357:61): [Folded - Ignored]
  Branch (357:28): [True: 0, False: 0]
  Branch (357:61): [True: 0, False: 0]
  Branch (357:28): [True: 0, False: 0]
  Branch (357:61): [True: 0, False: 0]
  Branch (357:28): [True: 0, False: 0]
  Branch (357:61): [True: 0, False: 0]
358
0
                            None
359
                        } else {
360
0
                            subscriber.borrow().await.ok()
361
                        };
362
0
                    if let Some(reloaded_awaited_action) = maybe_awaited_action {
  Branch (362:28): [True: 0, False: 0]
  Branch (362:28): [True: 0, False: 0]
  Branch (362:28): [Folded - Ignored]
  Branch (362:28): [True: 0, False: 0]
  Branch (362:28): [True: 0, False: 0]
  Branch (362:28): [True: 0, False: 0]
363
0
                        maybe_reloaded_awaited_action = Some(reloaded_awaited_action);
364
0
                    } else {
365
0
                        warn!(
366
0
                            "Failed to update action to timed out state after client keepalive timeout. This is ok if multiple schedulers tried to set the state at the same time: {err}",
367
                        );
368
0
                        break;
369
                    }
370
                    // Re-check the predicate after reload.
371
0
                    if maybe_reloaded_awaited_action
  Branch (371:24): [True: 0, False: 0]
  Branch (371:24): [True: 0, False: 0]
  Branch (371:24): [Folded - Ignored]
  Branch (371:24): [True: 0, False: 0]
  Branch (371:24): [True: 0, False: 0]
  Branch (371:24): [True: 0, False: 0]
372
0
                        .as_ref()
373
0
                        .is_some_and(|awaited_action| {
374
0
                            awaited_action.last_client_keepalive_timestamp()
375
0
                                + self.client_action_timeout
376
0
                                >= (self.now_fn)().now()
377
0
                        })
378
                    {
379
0
                        timed_out = false;
380
0
                        break;
381
0
                    } else if maybe_reloaded_awaited_action
  Branch (381:31): [True: 0, False: 0]
  Branch (381:31): [True: 0, False: 0]
  Branch (381:31): [Folded - Ignored]
  Branch (381:31): [True: 0, False: 0]
  Branch (381:31): [True: 0, False: 0]
  Branch (381:31): [True: 0, False: 0]
382
0
                        .as_ref()
383
0
                        .is_some_and(|awaited_action| awaited_action.state().stage.is_finished())
384
                    {
385
0
                        break;
386
0
                    }
387
                }
388
0
            }
389
0
            if timed_out {
  Branch (389:16): [True: 0, False: 0]
  Branch (389:16): [True: 0, False: 0]
  Branch (389:16): [Folded - Ignored]
  Branch (389:16): [True: 0, False: 0]
  Branch (389:16): [True: 0, False: 0]
  Branch (389:16): [True: 0, False: 0]
390
0
                return false;
391
0
            }
392
554
        }
393
        // If the action was reloaded, then use that for the rest of the checks
394
        // instead of the input parameter.
395
554
        let awaited_action = maybe_reloaded_awaited_action
396
554
            .as_ref()
397
554
            .unwrap_or(awaited_action);
398
399
554
        if let Some(
operation_id0
) = &filter.operation_id {
  Branch (399:16): [True: 0, False: 0]
  Branch (399:16): [True: 0, False: 0]
  Branch (399:16): [Folded - Ignored]
  Branch (399:16): [True: 0, False: 6]
  Branch (399:16): [True: 0, False: 546]
  Branch (399:16): [True: 0, False: 2]
400
0
            if operation_id != awaited_action.operation_id() {
  Branch (400:16): [True: 0, False: 0]
  Branch (400:16): [True: 0, False: 0]
  Branch (400:16): [Folded - Ignored]
  Branch (400:16): [True: 0, False: 0]
  Branch (400:16): [True: 0, False: 0]
  Branch (400:16): [True: 0, False: 0]
401
0
                return false;
402
0
            }
403
554
        }
404
405
554
        if filter.worker_id.is_some() && 
filter.worker_id.as_ref()0
!= awaited_action.worker_id() {
  Branch (405:12): [True: 0, False: 0]
  Branch (405:42): [True: 0, False: 0]
  Branch (405:12): [True: 0, False: 0]
  Branch (405:42): [True: 0, False: 0]
  Branch (405:12): [Folded - Ignored]
  Branch (405:42): [Folded - Ignored]
  Branch (405:12): [True: 0, False: 6]
  Branch (405:42): [True: 0, False: 0]
  Branch (405:12): [True: 0, False: 546]
  Branch (405:42): [True: 0, False: 0]
  Branch (405:12): [True: 0, False: 2]
  Branch (405:42): [True: 0, False: 0]
406
0
            return false;
407
554
        }
408
409
        {
410
554
            if let Some(
filter_unique_key0
) = &filter.unique_key {
  Branch (410:20): [True: 0, False: 0]
  Branch (410:20): [True: 0, False: 0]
  Branch (410:20): [Folded - Ignored]
  Branch (410:20): [True: 0, False: 6]
  Branch (410:20): [True: 0, False: 546]
  Branch (410:20): [True: 0, False: 2]
411
0
                match &awaited_action.action_info().unique_qualifier {
412
0
                    ActionUniqueQualifier::Cacheable(unique_key) => {
413
0
                        if filter_unique_key != unique_key {
  Branch (413:28): [True: 0, False: 0]
  Branch (413:28): [True: 0, False: 0]
  Branch (413:28): [Folded - Ignored]
  Branch (413:28): [True: 0, False: 0]
  Branch (413:28): [True: 0, False: 0]
  Branch (413:28): [True: 0, False: 0]
414
0
                            return false;
415
0
                        }
416
                    }
417
                    ActionUniqueQualifier::Uncacheable(_) => {
418
0
                        return false;
419
                    }
420
                }
421
554
            }
422
554
            if let Some(
action_digest0
) = filter.action_digest {
  Branch (422:20): [True: 0, False: 0]
  Branch (422:20): [True: 0, False: 0]
  Branch (422:20): [Folded - Ignored]
  Branch (422:20): [True: 0, False: 6]
  Branch (422:20): [True: 0, False: 546]
  Branch (422:20): [True: 0, False: 2]
423
0
                if action_digest != awaited_action.action_info().digest() {
  Branch (423:20): [True: 0, False: 0]
  Branch (423:20): [True: 0, False: 0]
  Branch (423:20): [Folded - Ignored]
  Branch (423:20): [True: 0, False: 0]
  Branch (423:20): [True: 0, False: 0]
  Branch (423:20): [True: 0, False: 0]
424
0
                    return false;
425
0
                }
426
554
            }
427
        }
428
429
        {
430
554
            let last_worker_update_timestamp = awaited_action.last_worker_updated_timestamp();
431
554
            if let Some(
worker_update_before0
) = filter.worker_update_before {
  Branch (431:20): [True: 0, False: 0]
  Branch (431:20): [True: 0, False: 0]
  Branch (431:20): [Folded - Ignored]
  Branch (431:20): [True: 0, False: 6]
  Branch (431:20): [True: 0, False: 546]
  Branch (431:20): [True: 0, False: 2]
432
0
                if worker_update_before < last_worker_update_timestamp {
  Branch (432:20): [True: 0, False: 0]
  Branch (432:20): [True: 0, False: 0]
  Branch (432:20): [Folded - Ignored]
  Branch (432:20): [True: 0, False: 0]
  Branch (432:20): [True: 0, False: 0]
  Branch (432:20): [True: 0, False: 0]
433
0
                    return false;
434
0
                }
435
554
            }
436
554
            if let Some(
completed_before0
) = filter.completed_before {
  Branch (436:20): [True: 0, False: 0]
  Branch (436:20): [True: 0, False: 0]
  Branch (436:20): [Folded - Ignored]
  Branch (436:20): [True: 0, False: 6]
  Branch (436:20): [True: 0, False: 546]
  Branch (436:20): [True: 0, False: 2]
437
0
                if awaited_action.state().stage.is_finished()
  Branch (437:20): [True: 0, False: 0]
  Branch (437:20): [True: 0, False: 0]
  Branch (437:20): [Folded - Ignored]
  Branch (437:20): [True: 0, False: 0]
  Branch (437:20): [True: 0, False: 0]
  Branch (437:20): [True: 0, False: 0]
438
0
                    && completed_before < last_worker_update_timestamp
  Branch (438:24): [True: 0, False: 0]
  Branch (438:24): [True: 0, False: 0]
  Branch (438:24): [Folded - Ignored]
  Branch (438:24): [True: 0, False: 0]
  Branch (438:24): [True: 0, False: 0]
  Branch (438:24): [True: 0, False: 0]
439
                {
440
0
                    return false;
441
0
                }
442
554
            }
443
554
            if filter.stages != OperationStageFlags::Any {
  Branch (443:16): [True: 0, False: 0]
  Branch (443:16): [True: 0, False: 0]
  Branch (443:16): [Folded - Ignored]
  Branch (443:16): [True: 5, False: 1]
  Branch (443:16): [True: 543, False: 3]
  Branch (443:16): [True: 2, False: 0]
444
550
                let stage_flag = match awaited_action.state().stage {
445
0
                    ActionStage::Unknown => OperationStageFlags::Any,
446
0
                    ActionStage::CacheCheck => OperationStageFlags::CacheCheck,
447
550
                    ActionStage::Queued => OperationStageFlags::Queued,
448
0
                    ActionStage::Executing => OperationStageFlags::Executing,
449
                    ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => {
450
0
                        OperationStageFlags::Completed
451
                    }
452
                };
453
550
                if !filter.stages.intersects(stage_flag) {
  Branch (453:20): [True: 0, False: 0]
  Branch (453:20): [True: 0, False: 0]
  Branch (453:20): [Folded - Ignored]
  Branch (453:20): [True: 0, False: 5]
  Branch (453:20): [True: 0, False: 543]
  Branch (453:20): [True: 0, False: 2]
454
0
                    return false;
455
550
                }
456
4
            }
457
        }
458
459
554
        true
460
554
    }
461
462
    /// Let the scheduler know that an operation has timed out from
463
    /// the client side (ie: worker has not updated in a while).
464
1
    async fn timeout_operation_id(&self, operation_id: &OperationId) -> Result<(), Error> {
465
        // Ensure that only one timeout operation is running at a time.
466
        // Failing to do this could result in the same operation being
467
        // timed out multiple times at the same time.
468
        // Note: We could implement this on a per-operation_id basis, but it is quite
469
        // complex to manage the locks.
470
1
        let _lock = self.timeout_operation_mux.lock().await;
471
472
1
        let awaited_action_subscriber = self
473
1
            .action_db
474
1
            .get_by_operation_id(operation_id)
475
1
            .await
476
1
            .err_tip(|| "In SimpleSchedulerStateManager::timeout_operation_id")
?0
477
1
            .err_tip(|| 
{0
478
0
                format!("Operation id {operation_id} does not exist in SimpleSchedulerStateManager::timeout_operation_id")
479
0
            })?;
480
481
1
        let awaited_action = awaited_action_subscriber
482
1
            .borrow()
483
1
            .await
484
1
            .err_tip(|| "In SimpleSchedulerStateManager::timeout_operation_id")
?0
;
485
486
        // If the action is not executing, we should not timeout the action.
487
1
        if !
matches!0
(awaited_action.state().stage, ActionStage::Executing) {
  Branch (487:12): [True: 0, False: 0]
  Branch (487:12): [True: 0, False: 0]
  Branch (487:12): [Folded - Ignored]
  Branch (487:12): [True: 0, False: 0]
  Branch (487:12): [True: 0, False: 1]
  Branch (487:12): [True: 0, False: 0]
488
0
            return Ok(());
489
1
        }
490
491
1
        let worker_should_update_before = awaited_action
492
1
            .last_worker_updated_timestamp()
493
1
            .checked_add(self.no_event_action_timeout)
494
1
            .ok_or_else(|| 
{0
495
0
                make_err!(
496
0
                    Code::Internal,
497
                    "Timestamp overflow for operation {operation_id} in SimpleSchedulerStateManager::timeout_operation_id"
498
                )
499
0
            })?;
500
1
        if worker_should_update_before >= (self.now_fn)().now() {
  Branch (500:12): [True: 0, False: 0]
  Branch (500:12): [True: 0, False: 0]
  Branch (500:12): [Folded - Ignored]
  Branch (500:12): [True: 0, False: 0]
  Branch (500:12): [True: 0, False: 1]
  Branch (500:12): [True: 0, False: 0]
501
            // The action was updated recently, we should not timeout the action.
502
            // This is to prevent timing out actions that have recently been updated
503
            // (like multiple clients timeout the same action at the same time).
504
0
            return Ok(());
505
1
        }
506
507
1
        self.assign_operation(
508
1
            operation_id,
509
1
            Err(make_err!(
510
1
                Code::DeadlineExceeded,
511
1
                "Operation timed out after {} seconds",
512
1
                self.no_event_action_timeout.as_secs_f32(),
513
1
            )),
514
1
        )
515
1
        .await
516
1
    }
517
518
49
    async fn inner_update_operation(
519
49
        &self,
520
49
        operation_id: &OperationId,
521
49
        maybe_worker_id: Option<&WorkerId>,
522
49
        update: UpdateOperationType,
523
49
    ) -> Result<(), Error> {
524
49
        let mut last_err = None;
525
50
        for _ in 0..MAX_UPDATE_RETRIES {
526
50
            let maybe_awaited_action_subscriber = self
527
50
                .action_db
528
50
                .get_by_operation_id(operation_id)
529
50
                .await
530
50
                .err_tip(|| "In SimpleSchedulerStateManager::update_operation")
?0
;
531
50
            let Some(
awaited_action_subscriber49
) = maybe_awaited_action_subscriber else {
  Branch (531:17): [True: 0, False: 0]
  Branch (531:17): [True: 0, False: 0]
  Branch (531:17): [Folded - Ignored]
  Branch (531:17): [True: 8, False: 0]
  Branch (531:17): [True: 39, False: 0]
  Branch (531:17): [True: 2, False: 1]
532
                // No action found. It is ok if the action was not found. It
533
                // probably means that the action was dropped, but worker was
534
                // still processing it.
535
1
                return Ok(());
536
            };
537
538
49
            let mut awaited_action = awaited_action_subscriber
539
49
                .borrow()
540
49
                .await
541
49
                .err_tip(|| "In SimpleSchedulerStateManager::update_operation")
?0
;
542
543
            // Make sure the worker id matches the awaited action worker id.
544
            // This might happen if the worker sending the update is not the
545
            // worker that was assigned.
546
49
            if awaited_action.worker_id().is_some()
  Branch (546:16): [True: 0, False: 0]
  Branch (546:16): [True: 0, False: 0]
  Branch (546:16): [Folded - Ignored]
  Branch (546:16): [True: 4, False: 4]
  Branch (546:16): [True: 13, False: 26]
  Branch (546:16): [True: 0, False: 2]
547
17
                && maybe_worker_id.is_some()
  Branch (547:20): [True: 0, False: 0]
  Branch (547:20): [True: 0, False: 0]
  Branch (547:20): [Folded - Ignored]
  Branch (547:20): [True: 4, False: 0]
  Branch (547:20): [True: 12, False: 1]
  Branch (547:20): [True: 0, False: 0]
548
16
                && maybe_worker_id != awaited_action.worker_id()
  Branch (548:20): [True: 0, False: 0]
  Branch (548:20): [True: 0, False: 0]
  Branch (548:20): [Folded - Ignored]
  Branch (548:20): [True: 0, False: 4]
  Branch (548:20): [True: 0, False: 12]
  Branch (548:20): [True: 0, False: 0]
549
            {
550
                // If another worker is already assigned to the action, another
551
                // worker probably picked up the action. We should not update the
552
                // action in this case and abort this operation.
553
0
                let err = make_err!(
554
0
                    Code::Aborted,
555
                    "Worker ids do not match - {:?} != {:?} for {:?}",
556
                    maybe_worker_id,
557
0
                    awaited_action.worker_id(),
558
                    awaited_action,
559
                );
560
0
                info!(
561
0
                    "Worker ids do not match - {:?} != {:?} for {:?}. This is probably due to another worker picking up the action.",
562
                    maybe_worker_id,
563
0
                    awaited_action.worker_id(),
564
                    awaited_action,
565
                );
566
0
                return Err(err);
567
49
            }
568
569
            // Make sure we don't update an action that is already completed.
570
49
            if awaited_action.state().stage.is_finished() {
  Branch (570:16): [True: 0, False: 0]
  Branch (570:16): [True: 0, False: 0]
  Branch (570:16): [Folded - Ignored]
  Branch (570:16): [True: 0, False: 8]
  Branch (570:16): [True: 0, False: 39]
  Branch (570:16): [True: 0, False: 2]
571
0
                match &update {
572
                    UpdateOperationType::UpdateWithDisconnect | UpdateOperationType::KeepAlive => {
573
                        // No need to error a keep-alive when it's completed, it's just
574
                        // unnecessary log noise.
575
0
                        return Ok(());
576
                    }
577
                    _ => {
578
0
                        return Err(make_err!(
579
0
                            Code::Internal,
580
0
                            "Action {operation_id:?} is already completed with state {:?} - maybe_worker_id: {:?}",
581
0
                            awaited_action.state().stage,
582
0
                            maybe_worker_id,
583
0
                        ));
584
                    }
585
                }
586
49
            }
587
588
49
            let stage = match &update {
589
                UpdateOperationType::KeepAlive => {
590
0
                    awaited_action.worker_keep_alive((self.now_fn)().now());
591
0
                    match self
592
0
                        .action_db
593
0
                        .update_awaited_action(awaited_action)
594
0
                        .await
595
0
                        .err_tip(|| "Failed to send KeepAlive in SimpleSchedulerStateManager::update_operation") {
596
                        // Try again if there was a version mismatch.
597
0
                        Err(err) if err.code == Code::Aborted => {
  Branch (597:37): [True: 0, False: 0]
  Branch (597:37): [True: 0, False: 0]
  Branch (597:37): [Folded - Ignored]
  Branch (597:37): [True: 0, False: 0]
  Branch (597:37): [True: 0, False: 0]
  Branch (597:37): [True: 0, False: 0]
598
0
                            last_err = Some(err);
599
0
                            continue;
600
                        }
601
0
                        result => return result,
602
                    }
603
                }
604
37
                UpdateOperationType::UpdateWithActionStage(stage) => stage.clone(),
605
12
                UpdateOperationType::UpdateWithError(err) => {
606
                    // Don't count a backpressure failure as an attempt for an action.
607
12
                    let due_to_backpressure = err.code == Code::ResourceExhausted;
608
12
                    if !due_to_backpressure {
  Branch (608:24): [True: 0, False: 0]
  Branch (608:24): [True: 0, False: 0]
  Branch (608:24): [Folded - Ignored]
  Branch (608:24): [True: 4, False: 0]
  Branch (608:24): [True: 8, False: 0]
  Branch (608:24): [True: 0, False: 0]
609
12
                        awaited_action.attempts += 1;
610
12
                    
}0
611
612
12
                    if awaited_action.attempts > self.max_job_retries {
  Branch (612:24): [True: 0, False: 0]
  Branch (612:24): [True: 0, False: 0]
  Branch (612:24): [Folded - Ignored]
  Branch (612:24): [True: 1, False: 3]
  Branch (612:24): [True: 1, False: 7]
  Branch (612:24): [True: 0, False: 0]
613
2
                        ActionStage::Completed(ActionResult {
614
2
                            execution_metadata: ExecutionMetadata {
615
2
                                worker: maybe_worker_id.map_or_else(String::default, ToString::to_string),
616
2
                                ..ExecutionMetadata::default()
617
2
                            },
618
2
                            error: Some(err.clone().merge(make_err!(
619
2
                                Code::Internal,
620
2
                                "Job cancelled because it attempted to execute too many times {} > {} times {}",
621
2
                                awaited_action.attempts,
622
2
                                self.max_job_retries,
623
2
                                format!("for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}"),
624
2
                            ))),
625
2
                            ..ActionResult::default()
626
2
                        })
627
                    } else {
628
10
                        ActionStage::Queued
629
                    }
630
                }
631
0
                UpdateOperationType::UpdateWithDisconnect => ActionStage::Queued,
632
                // We shouldn't get here, but we just ignore it if we do.
633
                UpdateOperationType::ExecutionComplete => {
634
0
                    warn!("inner_update_operation got an ExecutionComplete, that's unexpected.");
635
0
                    return Ok(());
636
                }
637
            };
638
49
            let now = (self.now_fn)().now();
639
49
            if 
matches!39
(stage, ActionStage::Queued) {
640
10
                // If the action is queued, we need to unset the worker id regardless of
641
10
                // which worker sent the update.
642
10
                awaited_action.set_worker_id(None, now);
643
39
            } else {
644
39
                awaited_action.set_worker_id(maybe_worker_id.cloned(), now);
645
39
            }
646
49
            awaited_action.worker_set_state(
647
49
                Arc::new(ActionState {
648
49
                    stage,
649
49
                    // Client id is not known here, it is the responsibility of
650
49
                    // the the subscriber impl to replace this with the
651
49
                    // correct client id.
652
49
                    client_operation_id: operation_id.clone(),
653
49
                    action_digest: awaited_action.action_info().digest(),
654
49
                }),
655
49
                now,
656
            );
657
658
49
            let update_action_result = self
659
49
                .action_db
660
49
                .update_awaited_action(awaited_action)
661
49
                .await
662
49
                .err_tip(|| "In SimpleSchedulerStateManager::update_operation");
663
49
            if let Err(
err2
) = update_action_result {
  Branch (663:20): [True: 0, False: 0]
  Branch (663:20): [True: 0, False: 0]
  Branch (663:20): [Folded - Ignored]
  Branch (663:20): [True: 0, False: 8]
  Branch (663:20): [True: 0, False: 39]
  Branch (663:20): [True: 2, False: 0]
664
                // We use Aborted to signal that the action was not
665
                // updated due to the data being set was not the latest
666
                // but can be retried.
667
2
                if err.code == Code::Aborted {
  Branch (667:20): [True: 0, False: 0]
  Branch (667:20): [True: 0, False: 0]
  Branch (667:20): [Folded - Ignored]
  Branch (667:20): [True: 0, False: 0]
  Branch (667:20): [True: 0, False: 0]
  Branch (667:20): [True: 1, False: 1]
668
1
                    last_err = Some(err);
669
1
                    continue;
670
1
                }
671
1
                return Err(err);
672
47
            }
673
47
            return Ok(());
674
        }
675
0
        Err(last_err.unwrap_or_else(|| {
676
0
            make_err!(
677
0
                Code::Internal,
678
                "Failed to update action after {} retries with no error set",
679
                MAX_UPDATE_RETRIES,
680
            )
681
0
        }))
682
49
    }
683
684
30
    async fn inner_add_operation(
685
30
        &self,
686
30
        new_client_operation_id: OperationId,
687
30
        action_info: Arc<ActionInfo>,
688
30
    ) -> Result<T::Subscriber, Error> {
689
30
        self.action_db
690
30
            .add_action(
691
30
                new_client_operation_id,
692
30
                action_info,
693
30
                self.no_event_action_timeout,
694
30
            )
695
30
            .await
696
30
            .err_tip(|| "In SimpleSchedulerStateManager::add_operation")
697
30
    }
698
699
615
    async fn inner_filter_operations<'a, F>(
700
615
        &'a self,
701
615
        filter: OperationFilter,
702
615
        to_action_state_result: F,
703
615
    ) -> Result<ActionStateResultStream<'a>, Error>
704
615
    where
705
615
        F: Fn(T::Subscriber) -> Box<dyn ActionStateResult> + Send + Sync + 'a,
706
615
    {
707
611
        const fn sorted_awaited_action_state_for_flags(
708
611
            stage: OperationStageFlags,
709
611
        ) -> Option<SortedAwaitedActionState> {
710
611
            match stage {
711
0
                OperationStageFlags::CacheCheck => Some(SortedAwaitedActionState::CacheCheck),
712
611
                OperationStageFlags::Queued => Some(SortedAwaitedActionState::Queued),
713
0
                OperationStageFlags::Executing => Some(SortedAwaitedActionState::Executing),
714
0
                OperationStageFlags::Completed => Some(SortedAwaitedActionState::Completed),
715
0
                _ => None,
716
            }
717
611
        }
718
719
615
        if let Some(
operation_id0
) = &filter.operation_id {
  Branch (719:16): [True: 0, False: 0]
  Branch (719:16): [True: 0, False: 0]
  Branch (719:16): [True: 0, False: 0]
  Branch (719:16): [True: 0, False: 0]
  Branch (719:16): [Folded - Ignored]
  Branch (719:16): [True: 0, False: 1]
  Branch (719:16): [True: 0, False: 17]
  Branch (719:16): [True: 0, False: 503]
  Branch (719:16): [True: 0, False: 90]
  Branch (719:16): [True: 0, False: 0]
  Branch (719:16): [True: 0, False: 4]
720
0
            let maybe_subscriber = self
721
0
                .action_db
722
0
                .get_by_operation_id(operation_id)
723
0
                .await
724
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
725
0
            let Some(subscriber) = maybe_subscriber else {
  Branch (725:17): [True: 0, False: 0]
  Branch (725:17): [True: 0, False: 0]
  Branch (725:17): [True: 0, False: 0]
  Branch (725:17): [True: 0, False: 0]
  Branch (725:17): [Folded - Ignored]
  Branch (725:17): [True: 0, False: 0]
  Branch (725:17): [True: 0, False: 0]
  Branch (725:17): [True: 0, False: 0]
  Branch (725:17): [True: 0, False: 0]
  Branch (725:17): [True: 0, False: 0]
  Branch (725:17): [True: 0, False: 0]
726
0
                return Ok(Box::pin(stream::empty()));
727
            };
728
0
            let awaited_action = subscriber
729
0
                .borrow()
730
0
                .await
731
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
732
0
            if !self
  Branch (732:16): [True: 0, False: 0]
  Branch (732:16): [True: 0, False: 0]
  Branch (732:16): [True: 0, False: 0]
  Branch (732:16): [True: 0, False: 0]
  Branch (732:16): [Folded - Ignored]
  Branch (732:16): [True: 0, False: 0]
  Branch (732:16): [True: 0, False: 0]
  Branch (732:16): [True: 0, False: 0]
  Branch (732:16): [True: 0, False: 0]
  Branch (732:16): [True: 0, False: 0]
  Branch (732:16): [True: 0, False: 0]
733
0
                .apply_filter_predicate(&awaited_action, &subscriber, &filter)
734
0
                .await
735
            {
736
0
                return Ok(Box::pin(stream::empty()));
737
0
            }
738
0
            return Ok(Box::pin(stream::once(async move {
739
0
                to_action_state_result(subscriber)
740
0
            })));
741
615
        }
742
615
        if let Some(
client_operation_id4
) = &filter.client_operation_id {
  Branch (742:16): [True: 0, False: 0]
  Branch (742:16): [True: 0, False: 0]
  Branch (742:16): [True: 0, False: 0]
  Branch (742:16): [True: 0, False: 0]
  Branch (742:16): [Folded - Ignored]
  Branch (742:16): [True: 1, False: 0]
  Branch (742:16): [True: 0, False: 17]
  Branch (742:16): [True: 3, False: 500]
  Branch (742:16): [True: 0, False: 90]
  Branch (742:16): [True: 0, False: 0]
  Branch (742:16): [True: 0, False: 4]
743
4
            let maybe_subscriber = self
744
4
                .action_db
745
4
                .get_awaited_action_by_id(client_operation_id)
746
4
                .await
747
4
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
;
748
4
            let Some(subscriber) = maybe_subscriber else {
  Branch (748:17): [True: 0, False: 0]
  Branch (748:17): [True: 0, False: 0]
  Branch (748:17): [True: 0, False: 0]
  Branch (748:17): [True: 0, False: 0]
  Branch (748:17): [Folded - Ignored]
  Branch (748:17): [True: 1, False: 0]
  Branch (748:17): [True: 0, False: 0]
  Branch (748:17): [True: 3, False: 0]
  Branch (748:17): [True: 0, False: 0]
  Branch (748:17): [True: 0, False: 0]
  Branch (748:17): [True: 0, False: 0]
749
0
                return Ok(Box::pin(stream::empty()));
750
            };
751
4
            let awaited_action = subscriber
752
4
                .borrow()
753
4
                .await
754
4
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
;
755
4
            if !self
  Branch (755:16): [True: 0, False: 0]
  Branch (755:16): [True: 0, False: 0]
  Branch (755:16): [True: 0, False: 0]
  Branch (755:16): [True: 0, False: 0]
  Branch (755:16): [Folded - Ignored]
  Branch (755:16): [True: 0, False: 1]
  Branch (755:16): [True: 0, False: 0]
  Branch (755:16): [True: 0, False: 3]
  Branch (755:16): [True: 0, False: 0]
  Branch (755:16): [True: 0, False: 0]
  Branch (755:16): [True: 0, False: 0]
756
4
                .apply_filter_predicate(&awaited_action, &subscriber, &filter)
757
4
                .await
758
            {
759
0
                return Ok(Box::pin(stream::empty()));
760
4
            }
761
4
            return Ok(Box::pin(stream::once(async move {
762
4
                to_action_state_result(subscriber)
763
4
            })));
764
611
        }
765
766
611
        let Some(sorted_awaited_action_state) =
  Branch (766:13): [True: 0, False: 0]
  Branch (766:13): [True: 0, False: 0]
  Branch (766:13): [True: 0, False: 0]
  Branch (766:13): [True: 0, False: 0]
  Branch (766:13): [Folded - Ignored]
  Branch (766:13): [True: 0, False: 0]
  Branch (766:13): [True: 17, False: 0]
  Branch (766:13): [True: 500, False: 0]
  Branch (766:13): [True: 90, False: 0]
  Branch (766:13): [True: 0, False: 0]
  Branch (766:13): [True: 4, False: 0]
767
611
            sorted_awaited_action_state_for_flags(filter.stages)
768
        else {
769
0
            let mut all_items: Vec<_> = self
770
0
                .action_db
771
0
                .get_all_awaited_actions()
772
0
                .await
773
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?
774
0
                .and_then(|awaited_action_subscriber| async move {
775
0
                    let awaited_action = awaited_action_subscriber
776
0
                        .borrow()
777
0
                        .await
778
0
                        .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
779
0
                    Ok((awaited_action_subscriber, awaited_action))
780
0
                })
781
0
                .try_filter_map(|(subscriber, awaited_action)| {
782
0
                    let filter = filter.clone();
783
0
                    async move {
784
0
                        Ok(self
785
0
                            .apply_filter_predicate(&awaited_action, &subscriber, &filter)
786
0
                            .await
787
0
                            .then_some((subscriber, awaited_action.sort_key())))
788
0
                    }
789
0
                })
790
0
                .try_collect()
791
0
                .await
792
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
793
0
            match filter.order_by_priority_direction {
794
0
                Some(OrderDirection::Asc) => all_items.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)),
795
0
                Some(OrderDirection::Desc) => all_items.sort_unstable_by(|(_, a), (_, b)| b.cmp(a)),
796
0
                None => {}
797
            }
798
0
            return Ok(Box::pin(stream::iter(
799
0
                all_items
800
0
                    .into_iter()
801
0
                    .map(move |(subscriber, _)| to_action_state_result(subscriber)),
802
            )));
803
        };
804
805
611
        let desc = 
matches!500
(
806
111
            filter.order_by_priority_direction,
807
            Some(OrderDirection::Desc)
808
        );
809
611
        let stream = self
810
611
            .action_db
811
611
            .get_range_of_actions(
812
611
                sorted_awaited_action_state,
813
611
                Bound::Unbounded,
814
611
                Bound::Unbounded,
815
611
                desc,
816
611
            )
817
611
            .await
818
611
            .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
819
611
            .and_then(|awaited_action_subscriber| async move 
{550
820
550
                let awaited_action = awaited_action_subscriber
821
550
                    .borrow()
822
550
                    .await
823
550
                    .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
;
824
550
                Ok((awaited_action_subscriber, awaited_action))
825
1.10k
            })
826
611
            .try_filter_map(move |(subscriber, awaited_action)| 
{550
827
550
                let filter = filter.clone();
828
550
                async move {
829
550
                    Ok(self
830
550
                        .apply_filter_predicate(&awaited_action, &subscriber, &filter)
831
550
                        .await
832
550
                        .then_some(subscriber))
833
550
                }
834
550
            })
835
611
            .map(move |result| -> Box<dyn ActionStateResult> 
{550
836
550
                result.map_or_else(
837
0
                    |e| -> Box<dyn ActionStateResult> { Box::new(ErrorActionStateResult(e)) },
838
550
                    |v| -> Box<dyn ActionStateResult> { to_action_state_result(v) },
839
                )
840
550
            });
841
611
        Ok(Box::pin(stream))
842
615
    }
843
}
844
845
#[async_trait]
846
impl<T, I, NowFn> ClientStateManager for SimpleSchedulerStateManager<T, I, NowFn>
847
where
848
    T: AwaitedActionDb,
849
    I: InstantWrapper,
850
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
851
{
852
    async fn add_action(
853
        &self,
854
        client_operation_id: OperationId,
855
        action_info: Arc<ActionInfo>,
856
30
    ) -> Result<Box<dyn ActionStateResult>, Error> {
857
        let sub = self
858
            .inner_add_operation(client_operation_id, action_info.clone())
859
            .await?;
860
861
        Ok(Box::new(ClientActionStateResult::new(
862
            sub,
863
            self.weak_self.clone(),
864
            self.no_event_action_timeout,
865
            self.now_fn.clone(),
866
        )))
867
30
    }
868
869
    async fn filter_operations<'a>(
870
        &'a self,
871
        filter: OperationFilter,
872
504
    ) -> Result<ActionStateResultStream<'a>, Error> {
873
504
        self.inner_filter_operations(filter, move |rx| {
874
504
            Box::new(ClientActionStateResult::new(
875
504
                rx,
876
504
                self.weak_self.clone(),
877
504
                self.no_event_action_timeout,
878
504
                self.now_fn.clone(),
879
504
            ))
880
504
        })
881
        .await
882
504
    }
883
884
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
885
0
        None
886
0
    }
887
}
888
889
#[async_trait]
890
impl<T, I, NowFn> WorkerStateManager for SimpleSchedulerStateManager<T, I, NowFn>
891
where
892
    T: AwaitedActionDb,
893
    I: InstantWrapper,
894
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
895
{
896
    async fn update_operation(
897
        &self,
898
        operation_id: &OperationId,
899
        worker_id: &WorkerId,
900
        update: UpdateOperationType,
901
16
    ) -> Result<(), Error> {
902
        self.inner_update_operation(operation_id, Some(worker_id), update)
903
            .await
904
16
    }
905
}
906
907
#[async_trait]
908
impl<T, I, NowFn> MatchingEngineStateManager for SimpleSchedulerStateManager<T, I, NowFn>
909
where
910
    T: AwaitedActionDb,
911
    I: InstantWrapper,
912
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
913
{
914
    async fn filter_operations<'a>(
915
        &'a self,
916
        filter: OperationFilter,
917
111
    ) -> Result<ActionStateResultStream<'a>, Error> {
918
50
        self.inner_filter_operations(filter, |rx| {
919
50
            Box::new(MatchingEngineActionStateResult::new(
920
50
                rx,
921
50
                self.weak_self.clone(),
922
50
                self.no_event_action_timeout,
923
50
                self.now_fn.clone(),
924
50
            ))
925
50
        })
926
        .await
927
111
    }
928
929
    async fn assign_operation(
930
        &self,
931
        operation_id: &OperationId,
932
        worker_id_or_reason_for_unassign: Result<&WorkerId, Error>,
933
33
    ) -> Result<(), Error> {
934
        let (maybe_worker_id, update) = match worker_id_or_reason_for_unassign {
935
            Ok(worker_id) => (
936
                Some(worker_id),
937
                UpdateOperationType::UpdateWithActionStage(ActionStage::Executing),
938
            ),
939
            Err(err) => (None, UpdateOperationType::UpdateWithError(err)),
940
        };
941
        self.inner_update_operation(operation_id, maybe_worker_id, update)
942
            .await
943
33
    }
944
}