Coverage Report

Created: 2026-02-23 10:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/simple_scheduler_state_manager.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::ops::Bound;
16
use core::time::Duration;
17
use std::string::ToString;
18
use std::sync::{Arc, Weak};
19
20
use async_lock::Mutex;
21
use async_trait::async_trait;
22
use futures::{StreamExt, TryStreamExt, stream};
23
use nativelink_error::{Code, Error, ResultExt, make_err};
24
use nativelink_metric::MetricsComponent;
25
use nativelink_util::action_messages::{
26
    ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueQualifier, ExecutionMetadata,
27
    OperationId, WorkerId,
28
};
29
use nativelink_util::instant_wrapper::InstantWrapper;
30
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
31
use nativelink_util::metrics::{
32
    EXECUTION_METRICS, EXECUTION_RESULT, EXECUTION_STAGE, ExecutionResult, ExecutionStage,
33
};
34
use nativelink_util::operation_state_manager::{
35
    ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
36
    OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType, WorkerStateManager,
37
};
38
use nativelink_util::origin_event::OriginMetadata;
39
use opentelemetry::KeyValue;
40
use tracing::{debug, info, trace, warn};
41
42
use super::awaited_action_db::{
43
    AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, SortedAwaitedActionState,
44
};
45
use crate::worker_registry::SharedWorkerRegistry;
46
47
/// Maximum number of times an update to the database
48
/// can fail before giving up.
49
const MAX_UPDATE_RETRIES: usize = 5;
50
51
/// Base delay for exponential backoff on version conflicts (in ms).
52
const BASE_RETRY_DELAY_MS: u64 = 10;
53
54
/// Maximum jitter to add to retry delay (in ms).
55
const MAX_RETRY_JITTER_MS: u64 = 20;
56
57
/// Simple struct that implements the `ActionStateResult` trait and always returns an error.
58
struct ErrorActionStateResult(Error);
59
60
#[async_trait]
61
impl ActionStateResult for ErrorActionStateResult {
62
0
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
63
        Err(self.0.clone())
64
0
    }
65
66
0
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
67
        Err(self.0.clone())
68
0
    }
69
70
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
71
        Err(self.0.clone())
72
0
    }
73
}
74
75
struct ClientActionStateResult<U, T, I, NowFn>
76
where
77
    U: AwaitedActionSubscriber,
78
    T: AwaitedActionDb,
79
    I: InstantWrapper,
80
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
81
{
82
    inner: MatchingEngineActionStateResult<U, T, I, NowFn>,
83
}
84
85
impl<U, T, I, NowFn> ClientActionStateResult<U, T, I, NowFn>
86
where
87
    U: AwaitedActionSubscriber,
88
    T: AwaitedActionDb,
89
    I: InstantWrapper,
90
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
91
{
92
535
    const fn new(
93
535
        sub: U,
94
535
        simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
95
535
        no_event_action_timeout: Duration,
96
535
        now_fn: NowFn,
97
535
    ) -> Self {
98
535
        Self {
99
535
            inner: MatchingEngineActionStateResult::new(
100
535
                sub,
101
535
                simple_scheduler_state_manager,
102
535
                no_event_action_timeout,
103
535
                now_fn,
104
535
            ),
105
535
        }
106
535
    }
107
}
108
109
#[async_trait]
110
impl<U, T, I, NowFn> ActionStateResult for ClientActionStateResult<U, T, I, NowFn>
111
where
112
    U: AwaitedActionSubscriber,
113
    T: AwaitedActionDb,
114
    I: InstantWrapper,
115
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
116
{
117
6
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
118
        self.inner.as_state().await
119
6
    }
120
121
50
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
122
        self.inner.changed().await
123
50
    }
124
125
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
126
        self.inner.as_action_info().await
127
0
    }
128
}
129
130
struct MatchingEngineActionStateResult<U, T, I, NowFn>
131
where
132
    U: AwaitedActionSubscriber,
133
    T: AwaitedActionDb,
134
    I: InstantWrapper,
135
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
136
{
137
    awaited_action_sub: U,
138
    simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
139
    no_event_action_timeout: Duration,
140
    now_fn: NowFn,
141
}
142
impl<U, T, I, NowFn> MatchingEngineActionStateResult<U, T, I, NowFn>
143
where
144
    U: AwaitedActionSubscriber,
145
    T: AwaitedActionDb,
146
    I: InstantWrapper,
147
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
148
{
149
590
    const fn new(
150
590
        awaited_action_sub: U,
151
590
        simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
152
590
        no_event_action_timeout: Duration,
153
590
        now_fn: NowFn,
154
590
    ) -> Self {
155
590
        Self {
156
590
            awaited_action_sub,
157
590
            simple_scheduler_state_manager,
158
590
            no_event_action_timeout,
159
590
            now_fn,
160
590
        }
161
590
    }
162
}
163
164
#[async_trait]
165
impl<U, T, I, NowFn> ActionStateResult for MatchingEngineActionStateResult<U, T, I, NowFn>
166
where
167
    U: AwaitedActionSubscriber,
168
    T: AwaitedActionDb,
169
    I: InstantWrapper,
170
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
171
{
172
42
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
173
        let awaited_action = self
174
            .awaited_action_sub
175
            .borrow()
176
            .await
177
            .err_tip(|| "In MatchingEngineActionStateResult::as_state")?;
178
        Ok((
179
            awaited_action.state().clone(),
180
            awaited_action.maybe_origin_metadata().cloned(),
181
        ))
182
42
    }
183
184
50
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
185
        let mut timeout_attempts = 0;
186
        loop {
187
            tokio::select! {
188
                awaited_action_result = self.awaited_action_sub.changed() => {
189
                    return awaited_action_result
190
                        .err_tip(|| "In MatchingEngineActionStateResult::changed")
191
46
                        .map(|v| (v.state().clone(), v.maybe_origin_metadata().cloned()));
192
                }
193
                () = (self.now_fn)().sleep(self.no_event_action_timeout) => {
194
                    // Timeout happened, do additional checks below.
195
                }
196
            }
197
198
            let awaited_action = self
199
                .awaited_action_sub
200
                .borrow()
201
                .await
202
                .err_tip(|| "In MatchingEngineActionStateResult::changed")?;
203
204
            if matches!(awaited_action.state().stage, ActionStage::Queued) {
205
                // Actions in queued state do not get periodically updated,
206
                // so we don't need to timeout them.
207
                continue;
208
            }
209
210
            let simple_scheduler_state_manager = self
211
                .simple_scheduler_state_manager
212
                .upgrade()
213
0
                .err_tip(|| format!("Failed to upgrade weak reference to SimpleSchedulerStateManager in MatchingEngineActionStateResult::changed at attempt: {timeout_attempts}"))?;
214
215
            // Check if worker is alive via registry before timing out.
216
            let should_timeout = simple_scheduler_state_manager
217
                .should_timeout_operation(&awaited_action)
218
                .await;
219
220
            if !should_timeout {
221
                // Worker is alive, continue waiting for updates
222
                trace!(
223
                    operation_id = %awaited_action.operation_id(),
224
                    "Operation timeout check passed, worker is alive"
225
                );
226
                continue;
227
            }
228
229
            warn!(
230
                ?awaited_action,
231
                "OperationId {} / {} timed out after {} seconds issuing a retry",
232
                awaited_action.operation_id(),
233
                awaited_action.state().client_operation_id,
234
                self.no_event_action_timeout.as_secs_f32(),
235
            );
236
237
            simple_scheduler_state_manager
238
                .timeout_operation_id(awaited_action.operation_id())
239
                .await
240
                .err_tip(|| "In MatchingEngineActionStateResult::changed")?;
241
242
            if timeout_attempts >= MAX_UPDATE_RETRIES {
243
                return Err(make_err!(
244
                    Code::Internal,
245
                    "Failed to update action after {} retries with no error set in MatchingEngineActionStateResult::changed - {} {:?}",
246
                    MAX_UPDATE_RETRIES,
247
                    awaited_action.operation_id(),
248
                    awaited_action.state().stage,
249
                ));
250
            }
251
            timeout_attempts += 1;
252
        }
253
50
    }
254
255
55
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
256
        let awaited_action = self
257
            .awaited_action_sub
258
            .borrow()
259
            .await
260
            .err_tip(|| "In MatchingEngineActionStateResult::as_action_info")?;
261
        Ok((
262
            awaited_action.action_info().clone(),
263
            awaited_action.maybe_origin_metadata().cloned(),
264
        ))
265
55
    }
266
}
267
268
/// `SimpleSchedulerStateManager` is responsible for maintaining the state of the scheduler.
269
/// Scheduler state includes the actions that are queued, active, and recently completed.
270
/// It also includes the workers that are available to execute actions based on allocation
271
/// strategy.
272
#[derive(MetricsComponent, Debug)]
273
pub struct SimpleSchedulerStateManager<T, I, NowFn>
274
where
275
    T: AwaitedActionDb,
276
    I: InstantWrapper,
277
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
278
{
279
    /// Database for storing the state of all actions.
280
    #[metric(group = "action_db")]
281
    action_db: T,
282
283
    /// Maximum number of times a job can be retried.
284
    // TODO(palfrey) This should be a scheduler decorator instead
285
    // of always having it on every SimpleScheduler.
286
    #[metric(help = "Maximum number of times a job can be retried")]
287
    max_job_retries: usize,
288
289
    /// Duration after which an action is considered to be timed out if
290
    /// no event is received.
291
    #[metric(
292
        help = "Duration after which an action is considered to be timed out if no event is received"
293
    )]
294
    no_event_action_timeout: Duration,
295
296
    /// Mark operation as timed out if the worker has not updated in this duration.
297
    /// This is used to prevent operations from being stuck in the queue forever
298
    /// if it is not being processed by any worker.
299
    client_action_timeout: Duration,
300
301
    /// Maximum time an action can stay in Executing state without any worker
302
    /// update, regardless of worker keepalive status. `Duration::ZERO` disables.
303
    max_executing_timeout: Duration,
304
305
    // A lock to ensure only one timeout operation is running at a time
306
    // on this service.
307
    timeout_operation_mux: Mutex<()>,
308
309
    /// Weak reference to self.
310
    // We use a weak reference to reduce the risk of a memory leak from
311
    // future changes. If this becomes some kind of performance issue,
312
    // we can consider using a strong reference.
313
    weak_self: Weak<Self>,
314
315
    /// Function to get the current time.
316
    now_fn: NowFn,
317
318
    /// Worker registry for checking worker liveness.
319
    worker_registry: Option<SharedWorkerRegistry>,
320
}
321
322
impl<T, I, NowFn> SimpleSchedulerStateManager<T, I, NowFn>
323
where
324
    T: AwaitedActionDb,
325
    I: InstantWrapper,
326
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
327
{
328
26
    pub fn new(
329
26
        max_job_retries: usize,
330
26
        no_event_action_timeout: Duration,
331
26
        client_action_timeout: Duration,
332
26
        max_executing_timeout: Duration,
333
26
        action_db: T,
334
26
        now_fn: NowFn,
335
26
        worker_registry: Option<SharedWorkerRegistry>,
336
26
    ) -> Arc<Self> {
337
26
        Arc::new_cyclic(|weak_self| Self {
338
26
            action_db,
339
26
            max_job_retries,
340
26
            no_event_action_timeout,
341
26
            client_action_timeout,
342
26
            max_executing_timeout,
343
26
            timeout_operation_mux: Mutex::new(()),
344
26
            weak_self: weak_self.clone(),
345
26
            now_fn,
346
26
            worker_registry,
347
26
        })
348
26
    }
349
350
1
    pub async fn should_timeout_operation(&self, awaited_action: &AwaitedAction) -> bool {
351
1
        if !
matches!0
(awaited_action.state().stage, ActionStage::Executing) {
  Branch (351:12): [True: 0, False: 0]
  Branch (351:12): [True: 0, False: 0]
  Branch (351:12): [Folded - Ignored]
  Branch (351:12): [True: 0, False: 0]
  Branch (351:12): [True: 0, False: 1]
  Branch (351:12): [True: 0, False: 0]
  Branch (351:12): [True: 0, False: 0]
352
0
            return false;
353
1
        }
354
355
1
        let now = (self.now_fn)().now();
356
357
1
        let registry_alive = if let Some(ref worker_registry) = self.worker_registry {
  Branch (357:37): [True: 0, False: 0]
  Branch (357:37): [True: 0, False: 0]
  Branch (357:37): [Folded - Ignored]
  Branch (357:37): [True: 0, False: 0]
  Branch (357:37): [True: 1, False: 0]
  Branch (357:37): [True: 0, False: 0]
  Branch (357:37): [True: 0, False: 0]
358
1
            if let Some(worker_id) = awaited_action.worker_id() {
  Branch (358:20): [True: 0, False: 0]
  Branch (358:20): [True: 0, False: 0]
  Branch (358:20): [Folded - Ignored]
  Branch (358:20): [True: 0, False: 0]
  Branch (358:20): [True: 1, False: 0]
  Branch (358:20): [True: 0, False: 0]
  Branch (358:20): [True: 0, False: 0]
359
1
                worker_registry
360
1
                    .is_worker_alive(worker_id, self.no_event_action_timeout, now)
361
1
                    .await
362
            } else {
363
0
                false
364
            }
365
        } else {
366
0
            false
367
        };
368
369
1
        if registry_alive {
  Branch (369:12): [True: 0, False: 0]
  Branch (369:12): [True: 0, False: 0]
  Branch (369:12): [Folded - Ignored]
  Branch (369:12): [True: 0, False: 0]
  Branch (369:12): [True: 0, False: 1]
  Branch (369:12): [True: 0, False: 0]
  Branch (369:12): [True: 0, False: 0]
370
0
            if self.max_executing_timeout > Duration::ZERO {
  Branch (370:16): [True: 0, False: 0]
  Branch (370:16): [True: 0, False: 0]
  Branch (370:16): [Folded - Ignored]
  Branch (370:16): [True: 0, False: 0]
  Branch (370:16): [True: 0, False: 0]
  Branch (370:16): [True: 0, False: 0]
  Branch (370:16): [True: 0, False: 0]
371
0
                let last_update = awaited_action.last_worker_updated_timestamp();
372
0
                if let Ok(elapsed) = now.duration_since(last_update) {
  Branch (372:24): [True: 0, False: 0]
  Branch (372:24): [True: 0, False: 0]
  Branch (372:24): [Folded - Ignored]
  Branch (372:24): [True: 0, False: 0]
  Branch (372:24): [True: 0, False: 0]
  Branch (372:24): [True: 0, False: 0]
  Branch (372:24): [True: 0, False: 0]
373
0
                    return elapsed > self.max_executing_timeout;
374
0
                }
375
0
            }
376
0
            return false;
377
1
        }
378
379
1
        let worker_should_update_before = awaited_action
380
1
            .last_worker_updated_timestamp()
381
1
            .checked_add(self.no_event_action_timeout)
382
1
            .unwrap_or(now);
383
384
1
        worker_should_update_before < now
385
1
    }
386
387
559
    async fn apply_filter_predicate(
388
559
        &self,
389
559
        awaited_action: &AwaitedAction,
390
559
        subscriber: &T::Subscriber,
391
559
        filter: &OperationFilter,
392
559
    ) -> bool {
393
        // Note: The caller must filter `client_operation_id`.
394
395
559
        let mut maybe_reloaded_awaited_action: Option<AwaitedAction> = None;
396
559
        let now = (self.now_fn)().now();
397
398
        // Check if client has timed out
399
559
        if awaited_action.last_client_keepalive_timestamp() + self.client_action_timeout < now {
  Branch (399:12): [True: 0, False: 0]
  Branch (399:12): [True: 0, False: 0]
  Branch (399:12): [Folded - Ignored]
  Branch (399:12): [True: 0, False: 9]
  Branch (399:12): [True: 0, False: 548]
  Branch (399:12): [True: 0, False: 0]
  Branch (399:12): [True: 0, False: 2]
400
            // This may change if the version is out of date.
401
0
            let mut timed_out = true;
402
0
            if !awaited_action.state().stage.is_finished() {
  Branch (402:16): [True: 0, False: 0]
  Branch (402:16): [True: 0, False: 0]
  Branch (402:16): [Folded - Ignored]
  Branch (402:16): [True: 0, False: 0]
  Branch (402:16): [True: 0, False: 0]
  Branch (402:16): [True: 0, False: 0]
  Branch (402:16): [True: 0, False: 0]
403
0
                let mut state = awaited_action.state().as_ref().clone();
404
0
                state.stage = ActionStage::Completed(ActionResult {
405
0
                    error: Some(make_err!(
406
0
                        Code::DeadlineExceeded,
407
0
                        "Operation timed out {} seconds of having no more clients listening",
408
0
                        self.client_action_timeout.as_secs_f32(),
409
0
                    )),
410
0
                    ..ActionResult::default()
411
0
                });
412
0
                state.last_transition_timestamp = now;
413
0
                let state = Arc::new(state);
414
                // We may be competing with an client timestamp update, so try
415
                // this a few times.
416
0
                for attempt in 1..=MAX_UPDATE_RETRIES {
417
0
                    let mut new_awaited_action = match &maybe_reloaded_awaited_action {
418
0
                        None => awaited_action.clone(),
419
0
                        Some(reloaded_awaited_action) => reloaded_awaited_action.clone(),
420
                    };
421
0
                    new_awaited_action.worker_set_state(state.clone(), (self.now_fn)().now());
422
0
                    let err = match self
423
0
                        .action_db
424
0
                        .update_awaited_action(new_awaited_action)
425
0
                        .await
426
                    {
427
0
                        Ok(()) => break,
428
0
                        Err(err) => err,
429
                    };
430
                    // Reload from the database if the action was outdated.
431
0
                    let maybe_awaited_action =
432
0
                        if attempt == MAX_UPDATE_RETRIES || err.code != Code::Aborted {
  Branch (432:28): [True: 0, False: 0]
  Branch (432:61): [True: 0, False: 0]
  Branch (432:28): [True: 0, False: 0]
  Branch (432:61): [True: 0, False: 0]
  Branch (432:28): [Folded - Ignored]
  Branch (432:61): [Folded - Ignored]
  Branch (432:28): [True: 0, False: 0]
  Branch (432:61): [True: 0, False: 0]
  Branch (432:28): [True: 0, False: 0]
  Branch (432:61): [True: 0, False: 0]
  Branch (432:28): [True: 0, False: 0]
  Branch (432:61): [True: 0, False: 0]
  Branch (432:28): [True: 0, False: 0]
  Branch (432:61): [True: 0, False: 0]
433
0
                            None
434
                        } else {
435
0
                            subscriber.borrow().await.ok()
436
                        };
437
0
                    if let Some(reloaded_awaited_action) = maybe_awaited_action {
  Branch (437:28): [True: 0, False: 0]
  Branch (437:28): [True: 0, False: 0]
  Branch (437:28): [Folded - Ignored]
  Branch (437:28): [True: 0, False: 0]
  Branch (437:28): [True: 0, False: 0]
  Branch (437:28): [True: 0, False: 0]
  Branch (437:28): [True: 0, False: 0]
438
0
                        maybe_reloaded_awaited_action = Some(reloaded_awaited_action);
439
0
                    } else {
440
0
                        warn!(
441
0
                            "Failed to update action to timed out state after client keepalive timeout. This is ok if multiple schedulers tried to set the state at the same time: {err}",
442
                        );
443
0
                        break;
444
                    }
445
                    // Re-check the predicate after reload.
446
0
                    if maybe_reloaded_awaited_action
  Branch (446:24): [True: 0, False: 0]
  Branch (446:24): [True: 0, False: 0]
  Branch (446:24): [Folded - Ignored]
  Branch (446:24): [True: 0, False: 0]
  Branch (446:24): [True: 0, False: 0]
  Branch (446:24): [True: 0, False: 0]
  Branch (446:24): [True: 0, False: 0]
447
0
                        .as_ref()
448
0
                        .is_some_and(|awaited_action| {
449
0
                            awaited_action.last_client_keepalive_timestamp()
450
0
                                + self.client_action_timeout
451
0
                                >= (self.now_fn)().now()
452
0
                        })
453
                    {
454
0
                        timed_out = false;
455
0
                        break;
456
0
                    } else if maybe_reloaded_awaited_action
  Branch (456:31): [True: 0, False: 0]
  Branch (456:31): [True: 0, False: 0]
  Branch (456:31): [Folded - Ignored]
  Branch (456:31): [True: 0, False: 0]
  Branch (456:31): [True: 0, False: 0]
  Branch (456:31): [True: 0, False: 0]
  Branch (456:31): [True: 0, False: 0]
457
0
                        .as_ref()
458
0
                        .is_some_and(|awaited_action| awaited_action.state().stage.is_finished())
459
                    {
460
0
                        break;
461
0
                    }
462
                }
463
0
            }
464
0
            if timed_out {
  Branch (464:16): [True: 0, False: 0]
  Branch (464:16): [True: 0, False: 0]
  Branch (464:16): [Folded - Ignored]
  Branch (464:16): [True: 0, False: 0]
  Branch (464:16): [True: 0, False: 0]
  Branch (464:16): [True: 0, False: 0]
  Branch (464:16): [True: 0, False: 0]
465
0
                return false;
466
0
            }
467
559
        }
468
        // If the action was reloaded, then use that for the rest of the checks
469
        // instead of the input parameter.
470
559
        let awaited_action = maybe_reloaded_awaited_action
471
559
            .as_ref()
472
559
            .unwrap_or(awaited_action);
473
474
559
        if let Some(
operation_id0
) = &filter.operation_id {
  Branch (474:16): [True: 0, False: 0]
  Branch (474:16): [True: 0, False: 0]
  Branch (474:16): [Folded - Ignored]
  Branch (474:16): [True: 0, False: 9]
  Branch (474:16): [True: 0, False: 548]
  Branch (474:16): [True: 0, False: 0]
  Branch (474:16): [True: 0, False: 2]
475
0
            if operation_id != awaited_action.operation_id() {
  Branch (475:16): [True: 0, False: 0]
  Branch (475:16): [True: 0, False: 0]
  Branch (475:16): [Folded - Ignored]
  Branch (475:16): [True: 0, False: 0]
  Branch (475:16): [True: 0, False: 0]
  Branch (475:16): [True: 0, False: 0]
  Branch (475:16): [True: 0, False: 0]
476
0
                return false;
477
0
            }
478
559
        }
479
480
559
        if filter.worker_id.is_some() && 
filter.worker_id.as_ref()0
!= awaited_action.worker_id() {
  Branch (480:12): [True: 0, False: 0]
  Branch (480:42): [True: 0, False: 0]
  Branch (480:12): [True: 0, False: 0]
  Branch (480:42): [True: 0, False: 0]
  Branch (480:12): [Folded - Ignored]
  Branch (480:42): [Folded - Ignored]
  Branch (480:12): [True: 0, False: 9]
  Branch (480:42): [True: 0, False: 0]
  Branch (480:12): [True: 0, False: 548]
  Branch (480:42): [True: 0, False: 0]
  Branch (480:12): [True: 0, False: 0]
  Branch (480:42): [True: 0, False: 0]
  Branch (480:12): [True: 0, False: 2]
  Branch (480:42): [True: 0, False: 0]
481
0
            return false;
482
559
        }
483
484
        {
485
559
            if let Some(
filter_unique_key0
) = &filter.unique_key {
  Branch (485:20): [True: 0, False: 0]
  Branch (485:20): [True: 0, False: 0]
  Branch (485:20): [Folded - Ignored]
  Branch (485:20): [True: 0, False: 9]
  Branch (485:20): [True: 0, False: 548]
  Branch (485:20): [True: 0, False: 0]
  Branch (485:20): [True: 0, False: 2]
486
0
                match &awaited_action.action_info().unique_qualifier {
487
0
                    ActionUniqueQualifier::Cacheable(unique_key) => {
488
0
                        if filter_unique_key != unique_key {
  Branch (488:28): [True: 0, False: 0]
  Branch (488:28): [True: 0, False: 0]
  Branch (488:28): [Folded - Ignored]
  Branch (488:28): [True: 0, False: 0]
  Branch (488:28): [True: 0, False: 0]
  Branch (488:28): [True: 0, False: 0]
  Branch (488:28): [True: 0, False: 0]
489
0
                            return false;
490
0
                        }
491
                    }
492
                    ActionUniqueQualifier::Uncacheable(_) => {
493
0
                        return false;
494
                    }
495
                }
496
559
            }
497
559
            if let Some(
action_digest0
) = filter.action_digest {
  Branch (497:20): [True: 0, False: 0]
  Branch (497:20): [True: 0, False: 0]
  Branch (497:20): [Folded - Ignored]
  Branch (497:20): [True: 0, False: 9]
  Branch (497:20): [True: 0, False: 548]
  Branch (497:20): [True: 0, False: 0]
  Branch (497:20): [True: 0, False: 2]
498
0
                if action_digest != awaited_action.action_info().digest() {
  Branch (498:20): [True: 0, False: 0]
  Branch (498:20): [True: 0, False: 0]
  Branch (498:20): [Folded - Ignored]
  Branch (498:20): [True: 0, False: 0]
  Branch (498:20): [True: 0, False: 0]
  Branch (498:20): [True: 0, False: 0]
  Branch (498:20): [True: 0, False: 0]
499
0
                    return false;
500
0
                }
501
559
            }
502
        }
503
504
        {
505
559
            let last_worker_update_timestamp = awaited_action.last_worker_updated_timestamp();
506
559
            if let Some(
worker_update_before0
) = filter.worker_update_before {
  Branch (506:20): [True: 0, False: 0]
  Branch (506:20): [True: 0, False: 0]
  Branch (506:20): [Folded - Ignored]
  Branch (506:20): [True: 0, False: 9]
  Branch (506:20): [True: 0, False: 548]
  Branch (506:20): [True: 0, False: 0]
  Branch (506:20): [True: 0, False: 2]
507
0
                if worker_update_before < last_worker_update_timestamp {
  Branch (507:20): [True: 0, False: 0]
  Branch (507:20): [True: 0, False: 0]
  Branch (507:20): [Folded - Ignored]
  Branch (507:20): [True: 0, False: 0]
  Branch (507:20): [True: 0, False: 0]
  Branch (507:20): [True: 0, False: 0]
  Branch (507:20): [True: 0, False: 0]
508
0
                    return false;
509
0
                }
510
559
            }
511
559
            if let Some(
completed_before0
) = filter.completed_before {
  Branch (511:20): [True: 0, False: 0]
  Branch (511:20): [True: 0, False: 0]
  Branch (511:20): [Folded - Ignored]
  Branch (511:20): [True: 0, False: 9]
  Branch (511:20): [True: 0, False: 548]
  Branch (511:20): [True: 0, False: 0]
  Branch (511:20): [True: 0, False: 2]
512
0
                if awaited_action.state().stage.is_finished()
  Branch (512:20): [True: 0, False: 0]
  Branch (512:20): [True: 0, False: 0]
  Branch (512:20): [Folded - Ignored]
  Branch (512:20): [True: 0, False: 0]
  Branch (512:20): [True: 0, False: 0]
  Branch (512:20): [True: 0, False: 0]
  Branch (512:20): [True: 0, False: 0]
513
0
                    && completed_before < last_worker_update_timestamp
  Branch (513:24): [True: 0, False: 0]
  Branch (513:24): [True: 0, False: 0]
  Branch (513:24): [Folded - Ignored]
  Branch (513:24): [True: 0, False: 0]
  Branch (513:24): [True: 0, False: 0]
  Branch (513:24): [True: 0, False: 0]
  Branch (513:24): [True: 0, False: 0]
514
                {
515
0
                    return false;
516
0
                }
517
559
            }
518
559
            if filter.stages != OperationStageFlags::Any {
  Branch (518:16): [True: 0, False: 0]
  Branch (518:16): [True: 0, False: 0]
  Branch (518:16): [Folded - Ignored]
  Branch (518:16): [True: 8, False: 1]
  Branch (518:16): [True: 545, False: 3]
  Branch (518:16): [True: 0, False: 0]
  Branch (518:16): [True: 2, False: 0]
519
555
                let stage_flag = match awaited_action.state().stage {
520
0
                    ActionStage::Unknown => OperationStageFlags::Any,
521
0
                    ActionStage::CacheCheck => OperationStageFlags::CacheCheck,
522
555
                    ActionStage::Queued => OperationStageFlags::Queued,
523
0
                    ActionStage::Executing => OperationStageFlags::Executing,
524
                    ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => {
525
0
                        OperationStageFlags::Completed
526
                    }
527
                };
528
555
                if !filter.stages.intersects(stage_flag) {
  Branch (528:20): [True: 0, False: 0]
  Branch (528:20): [True: 0, False: 0]
  Branch (528:20): [Folded - Ignored]
  Branch (528:20): [True: 0, False: 8]
  Branch (528:20): [True: 0, False: 545]
  Branch (528:20): [True: 0, False: 0]
  Branch (528:20): [True: 0, False: 2]
529
0
                    return false;
530
555
                }
531
4
            }
532
        }
533
534
559
        true
535
559
    }
536
537
    /// Let the scheduler know that an operation has timed out from
538
    /// the client side (ie: worker has not updated in a while).
539
1
    async fn timeout_operation_id(&self, operation_id: &OperationId) -> Result<(), Error> {
540
        // Ensure that only one timeout operation is running at a time.
541
        // Failing to do this could result in the same operation being
542
        // timed out multiple times at the same time.
543
        // Note: We could implement this on a per-operation_id basis, but it is quite
544
        // complex to manage the locks.
545
1
        let _lock = self.timeout_operation_mux.lock().await;
546
547
1
        let awaited_action_subscriber = self
548
1
            .action_db
549
1
            .get_by_operation_id(operation_id)
550
1
            .await
551
1
            .err_tip(|| "In SimpleSchedulerStateManager::timeout_operation_id")
?0
552
1
            .err_tip(|| 
{0
553
0
                format!("Operation id {operation_id} does not exist in SimpleSchedulerStateManager::timeout_operation_id")
554
0
            })?;
555
556
1
        let awaited_action = awaited_action_subscriber
557
1
            .borrow()
558
1
            .await
559
1
            .err_tip(|| "In SimpleSchedulerStateManager::timeout_operation_id")
?0
;
560
561
        // If the action is not executing, we should not timeout the action.
562
1
        if !
matches!0
(awaited_action.state().stage, ActionStage::Executing) {
  Branch (562:12): [True: 0, False: 0]
  Branch (562:12): [True: 0, False: 0]
  Branch (562:12): [Folded - Ignored]
  Branch (562:12): [True: 0, False: 0]
  Branch (562:12): [True: 0, False: 1]
  Branch (562:12): [True: 0, False: 0]
  Branch (562:12): [True: 0, False: 0]
563
0
            return Ok(());
564
1
        }
565
566
1
        let now = (self.now_fn)().now();
567
568
        // Check worker liveness via registry if available.
569
1
        let registry_alive = if let Some(ref worker_registry) = self.worker_registry {
  Branch (569:37): [True: 0, False: 0]
  Branch (569:37): [True: 0, False: 0]
  Branch (569:37): [Folded - Ignored]
  Branch (569:37): [True: 0, False: 0]
  Branch (569:37): [True: 1, False: 0]
  Branch (569:37): [True: 0, False: 0]
  Branch (569:37): [True: 0, False: 0]
570
1
            if let Some(worker_id) = awaited_action.worker_id() {
  Branch (570:20): [True: 0, False: 0]
  Branch (570:20): [True: 0, False: 0]
  Branch (570:20): [Folded - Ignored]
  Branch (570:20): [True: 0, False: 0]
  Branch (570:20): [True: 1, False: 0]
  Branch (570:20): [True: 0, False: 0]
  Branch (570:20): [True: 0, False: 0]
571
1
                worker_registry
572
1
                    .is_worker_alive(worker_id, self.no_event_action_timeout, now)
573
1
                    .await
574
            } else {
575
0
                false
576
            }
577
        } else {
578
0
            false
579
        };
580
581
1
        let timestamp_alive = {
582
1
            let worker_should_update_before = awaited_action
583
1
                .last_worker_updated_timestamp()
584
1
                .checked_add(self.no_event_action_timeout)
585
1
                .unwrap_or(now);
586
1
            worker_should_update_before >= now
587
        };
588
589
1
        if registry_alive || timestamp_alive {
  Branch (589:12): [True: 0, False: 0]
  Branch (589:30): [True: 0, False: 0]
  Branch (589:12): [True: 0, False: 0]
  Branch (589:30): [True: 0, False: 0]
  Branch (589:12): [Folded - Ignored]
  Branch (589:30): [Folded - Ignored]
  Branch (589:12): [True: 0, False: 0]
  Branch (589:30): [True: 0, False: 0]
  Branch (589:12): [True: 0, False: 1]
  Branch (589:30): [True: 0, False: 1]
  Branch (589:12): [True: 0, False: 0]
  Branch (589:30): [True: 0, False: 0]
  Branch (589:12): [True: 0, False: 0]
  Branch (589:30): [True: 0, False: 0]
590
0
            trace!(
591
                %operation_id,
592
0
                worker_id = ?awaited_action.worker_id(),
593
                registry_alive,
594
                timestamp_alive,
595
0
                "Worker is alive, operation not timed out"
596
            );
597
0
            return Ok(());
598
1
        }
599
600
1
        debug!(
601
            %operation_id,
602
1
            worker_id = ?awaited_action.worker_id(),
603
            registry_alive,
604
            timestamp_alive,
605
1
            "Worker not alive via registry or timestamp, timing out operation"
606
        );
607
608
1
        self.assign_operation(
609
1
            operation_id,
610
1
            Err(make_err!(
611
1
                Code::DeadlineExceeded,
612
1
                "Operation timed out after {} seconds",
613
1
                self.no_event_action_timeout.as_secs_f32(),
614
1
            )),
615
1
        )
616
1
        .await
617
1
    }
618
619
54
    async fn inner_update_operation(
620
54
        &self,
621
54
        operation_id: &OperationId,
622
54
        maybe_worker_id: Option<&WorkerId>,
623
54
        update: UpdateOperationType,
624
54
    ) -> Result<(), Error> {
625
54
        let update_type_str = match &update {
626
0
            UpdateOperationType::KeepAlive => "KeepAlive",
627
41
            UpdateOperationType::UpdateWithActionStage(stage) => match stage {
628
0
                ActionStage::Queued => "Stage:Queued",
629
36
                ActionStage::Executing => "Stage:Executing",
630
5
                ActionStage::Completed(_) => "Stage:Completed",
631
0
                ActionStage::CompletedFromCache(_) => "Stage:CompletedFromCache",
632
0
                ActionStage::CacheCheck => "Stage:CacheCheck",
633
0
                ActionStage::Unknown => "Stage:Unknown",
634
            },
635
12
            UpdateOperationType::UpdateWithError(_) => "Error",
636
0
            UpdateOperationType::UpdateWithDisconnect => "Disconnect",
637
1
            UpdateOperationType::ExecutionComplete => "ExecutionComplete",
638
        };
639
640
54
        debug!(
641
            %operation_id,
642
            ?maybe_worker_id,
643
            update_type = %update_type_str,
644
54
            "inner_update_operation START"
645
        );
646
647
54
        let mut last_err = None;
648
54
        let mut retry_count = 0;
649
58
        for _ in 0..MAX_UPDATE_RETRIES {
650
58
            retry_count += 1;
651
58
            if retry_count > 1 {
  Branch (651:16): [True: 0, False: 0]
  Branch (651:16): [True: 0, False: 0]
  Branch (651:16): [Folded - Ignored]
  Branch (651:16): [True: 3, False: 12]
  Branch (651:16): [True: 0, False: 1]
  Branch (651:16): [True: 0, False: 39]
  Branch (651:16): [True: 0, False: 0]
  Branch (651:16): [True: 1, False: 2]
652
4
                let base_delay = BASE_RETRY_DELAY_MS * (1 << (retry_count - 2).min(4));
653
4
                let jitter = std::time::SystemTime::now()
654
4
                    .duration_since(std::time::UNIX_EPOCH)
655
4
                    .map(|d| u64::try_from(d.as_nanos()).expect("u64 error") % MAX_RETRY_JITTER_MS)
656
4
                    .unwrap_or(0);
657
4
                let delay = Duration::from_millis(base_delay + jitter);
658
659
4
                warn!(
660
                    %operation_id,
661
                    ?maybe_worker_id,
662
                    retry_count,
663
4
                    delay_ms = delay.as_millis(),
664
                    update_type = %update_type_str,
665
4
                    "Retrying operation update due to version conflict (with backoff)"
666
                );
667
668
4
                tokio::time::sleep(delay).await;
669
54
            }
670
58
            let maybe_awaited_action_subscriber = self
671
58
                .action_db
672
58
                .get_by_operation_id(operation_id)
673
58
                .await
674
58
                .err_tip(|| "In SimpleSchedulerStateManager::update_operation")
?0
;
675
58
            let Some(
awaited_action_subscriber56
) = maybe_awaited_action_subscriber else {
  Branch (675:17): [True: 0, False: 0]
  Branch (675:17): [True: 0, False: 0]
  Branch (675:17): [Folded - Ignored]
  Branch (675:17): [True: 15, False: 0]
  Branch (675:17): [True: 0, False: 1]
  Branch (675:17): [True: 39, False: 0]
  Branch (675:17): [True: 0, False: 0]
  Branch (675:17): [True: 2, False: 1]
676
                // No action found. It is ok if the action was not found. It
677
                // probably means that the action was dropped, but worker was
678
                // still processing it.
679
2
                warn!(
680
                    %operation_id,
681
2
                    "Unable to update action due to it being missing, probably dropped"
682
                );
683
2
                return Ok(());
684
            };
685
686
56
            let mut awaited_action = awaited_action_subscriber
687
56
                .borrow()
688
56
                .await
689
56
                .err_tip(|| "In SimpleSchedulerStateManager::update_operation")
?0
;
690
691
            // Make sure the worker id matches the awaited action worker id.
692
            // This might happen if the worker sending the update is not the
693
            // worker that was assigned.
694
56
            if awaited_action.worker_id().is_some()
  Branch (694:16): [True: 0, False: 0]
  Branch (694:16): [True: 0, False: 0]
  Branch (694:16): [Folded - Ignored]
  Branch (694:16): [True: 8, False: 7]
  Branch (694:16): [True: 0, False: 0]
  Branch (694:16): [True: 13, False: 26]
  Branch (694:16): [True: 0, False: 0]
  Branch (694:16): [True: 0, False: 2]
695
21
                && maybe_worker_id.is_some()
  Branch (695:20): [True: 0, False: 0]
  Branch (695:20): [True: 0, False: 0]
  Branch (695:20): [Folded - Ignored]
  Branch (695:20): [True: 8, False: 0]
  Branch (695:20): [True: 0, False: 0]
  Branch (695:20): [True: 12, False: 1]
  Branch (695:20): [True: 0, False: 0]
  Branch (695:20): [True: 0, False: 0]
696
20
                && maybe_worker_id != awaited_action.worker_id()
  Branch (696:20): [True: 0, False: 0]
  Branch (696:20): [True: 0, False: 0]
  Branch (696:20): [Folded - Ignored]
  Branch (696:20): [True: 0, False: 8]
  Branch (696:20): [True: 0, False: 0]
  Branch (696:20): [True: 0, False: 12]
  Branch (696:20): [True: 0, False: 0]
  Branch (696:20): [True: 0, False: 0]
697
            {
698
                // If another worker is already assigned to the action, another
699
                // worker probably picked up the action. We should not update the
700
                // action in this case and abort this operation.
701
0
                let err = make_err!(
702
0
                    Code::Aborted,
703
                    "Worker ids do not match - {:?} != {:?} for {:?}",
704
                    maybe_worker_id,
705
0
                    awaited_action.worker_id(),
706
                    awaited_action,
707
                );
708
0
                info!(
709
0
                    "Worker ids do not match - {:?} != {:?} for {:?}. This is probably due to another worker picking up the action.",
710
                    maybe_worker_id,
711
0
                    awaited_action.worker_id(),
712
                    awaited_action,
713
                );
714
0
                return Err(err);
715
56
            }
716
717
            // Make sure we don't update an action that is already completed.
718
56
            if awaited_action.state().stage.is_finished() {
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [Folded - Ignored]
  Branch (718:16): [True: 0, False: 15]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 39]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 0, False: 2]
719
0
                match &update {
720
                    UpdateOperationType::UpdateWithDisconnect | UpdateOperationType::KeepAlive => {
721
                        // No need to error a keep-alive when it's completed, it's just
722
                        // unnecessary log noise.
723
0
                        return Ok(());
724
                    }
725
                    _ => {
726
0
                        return Err(make_err!(
727
0
                            Code::Internal,
728
0
                            "Action {operation_id} is already completed with state {:?} - maybe_worker_id: {:?}",
729
0
                            awaited_action.state().stage,
730
0
                            maybe_worker_id,
731
0
                        ));
732
                    }
733
                }
734
56
            }
735
736
56
            let 
stage52
= match &update {
737
                UpdateOperationType::KeepAlive => {
738
0
                    awaited_action.worker_keep_alive((self.now_fn)().now());
739
0
                    match self
740
0
                        .action_db
741
0
                        .update_awaited_action(awaited_action)
742
0
                        .await
743
0
                        .err_tip(|| "Failed to send KeepAlive in SimpleSchedulerStateManager::update_operation") {
744
                        // Try again if there was a version mismatch.
745
0
                        Err(err) if err.code == Code::Aborted => {
  Branch (745:37): [True: 0, False: 0]
  Branch (745:37): [True: 0, False: 0]
  Branch (745:37): [Folded - Ignored]
  Branch (745:37): [True: 0, False: 0]
  Branch (745:37): [True: 0, False: 0]
  Branch (745:37): [True: 0, False: 0]
  Branch (745:37): [True: 0, False: 0]
  Branch (745:37): [True: 0, False: 0]
746
0
                            last_err = Some(err);
747
0
                            continue;
748
                        }
749
0
                        result => return result,
750
                    }
751
                }
752
44
                UpdateOperationType::UpdateWithActionStage(stage) => {
753
44
                    if stage == &ActionStage::Executing
  Branch (753:24): [True: 0, False: 0]
  Branch (753:24): [True: 0, False: 0]
  Branch (753:24): [Folded - Ignored]
  Branch (753:24): [True: 11, False: 0]
  Branch (753:24): [True: 0, False: 0]
  Branch (753:24): [True: 26, False: 5]
  Branch (753:24): [True: 0, False: 0]
  Branch (753:24): [True: 2, False: 0]
754
39
                        && awaited_action.state().stage == ActionStage::Executing
  Branch (754:28): [True: 0, False: 0]
  Branch (754:28): [True: 0, False: 0]
  Branch (754:28): [Folded - Ignored]
  Branch (754:28): [True: 4, False: 7]
  Branch (754:28): [True: 0, False: 0]
  Branch (754:28): [True: 0, False: 26]
  Branch (754:28): [True: 0, False: 0]
  Branch (754:28): [True: 0, False: 2]
755
                    {
756
4
                        warn!(state = ?awaited_action.state(), "Action already assigned");
757
4
                        return Err(make_err!(Code::Aborted, "Action already assigned"));
758
40
                    }
759
40
                    stage.clone()
760
                }
761
12
                UpdateOperationType::UpdateWithError(err) => {
762
                    // Don't count a backpressure failure as an attempt for an action.
763
12
                    let due_to_backpressure = err.code == Code::ResourceExhausted;
764
12
                    if !due_to_backpressure {
  Branch (764:24): [True: 0, False: 0]
  Branch (764:24): [True: 0, False: 0]
  Branch (764:24): [Folded - Ignored]
  Branch (764:24): [True: 4, False: 0]
  Branch (764:24): [True: 0, False: 0]
  Branch (764:24): [True: 8, False: 0]
  Branch (764:24): [True: 0, False: 0]
  Branch (764:24): [True: 0, False: 0]
765
12
                        awaited_action.attempts += 1;
766
12
                    
}0
767
768
12
                    if awaited_action.attempts > self.max_job_retries {
  Branch (768:24): [True: 0, False: 0]
  Branch (768:24): [True: 0, False: 0]
  Branch (768:24): [Folded - Ignored]
  Branch (768:24): [True: 1, False: 3]
  Branch (768:24): [True: 0, False: 0]
  Branch (768:24): [True: 1, False: 7]
  Branch (768:24): [True: 0, False: 0]
  Branch (768:24): [True: 0, False: 0]
769
2
                        ActionStage::Completed(ActionResult {
770
2
                            execution_metadata: ExecutionMetadata {
771
2
                                worker: maybe_worker_id.map_or_else(String::default, ToString::to_string),
772
2
                                ..ExecutionMetadata::default()
773
2
                            },
774
2
                            error: Some(err.clone().merge(make_err!(
775
2
                                Code::Internal,
776
2
                                "Job cancelled because it attempted to execute too many times {} > {} times {}",
777
2
                                awaited_action.attempts,
778
2
                                self.max_job_retries,
779
2
                                format!("for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}"),
780
2
                            ))),
781
2
                            ..ActionResult::default()
782
2
                        })
783
                    } else {
784
10
                        ActionStage::Queued
785
                    }
786
                }
787
0
                UpdateOperationType::UpdateWithDisconnect => ActionStage::Queued,
788
                // We shouldn't get here, but we just ignore it if we do.
789
                UpdateOperationType::ExecutionComplete => {
790
0
                    warn!("inner_update_operation got an ExecutionComplete, that's unexpected.");
791
0
                    return Ok(());
792
                }
793
            };
794
52
            let now = (self.now_fn)().now();
795
52
            if 
matches!42
(stage, ActionStage::Queued) {
796
10
                // If the action is queued, we need to unset the worker id regardless of
797
10
                // which worker sent the update.
798
10
                awaited_action.set_worker_id(None, now);
799
42
            } else {
800
42
                awaited_action.set_worker_id(maybe_worker_id.cloned(), now);
801
42
            }
802
52
            awaited_action.worker_set_state(
803
52
                Arc::new(ActionState {
804
52
                    stage,
805
52
                    // Client id is not known here, it is the responsibility of
806
52
                    // the the subscriber impl to replace this with the
807
52
                    // correct client id.
808
52
                    client_operation_id: operation_id.clone(),
809
52
                    action_digest: awaited_action.action_info().digest(),
810
52
                    last_transition_timestamp: now,
811
52
                }),
812
52
                now,
813
            );
814
815
52
            let update_action_result = self
816
52
                .action_db
817
52
                .update_awaited_action(awaited_action.clone())
818
52
                .await
819
52
                .err_tip(|| "In SimpleSchedulerStateManager::update_operation");
820
52
            if let Err(
err5
) = update_action_result {
  Branch (820:20): [True: 0, False: 0]
  Branch (820:20): [True: 0, False: 0]
  Branch (820:20): [Folded - Ignored]
  Branch (820:20): [True: 3, False: 8]
  Branch (820:20): [True: 0, False: 0]
  Branch (820:20): [True: 0, False: 39]
  Branch (820:20): [True: 0, False: 0]
  Branch (820:20): [True: 2, False: 0]
821
                // We use Aborted to signal that the action was not
822
                // updated due to the data being set was not the latest
823
                // but can be retried.
824
5
                if err.code == Code::Aborted {
  Branch (824:20): [True: 0, False: 0]
  Branch (824:20): [True: 0, False: 0]
  Branch (824:20): [Folded - Ignored]
  Branch (824:20): [True: 3, False: 0]
  Branch (824:20): [True: 0, False: 0]
  Branch (824:20): [True: 0, False: 0]
  Branch (824:20): [True: 0, False: 0]
  Branch (824:20): [True: 1, False: 1]
825
4
                    debug!(
826
                        %operation_id,
827
                        retry_count,
828
                        update_type = %update_type_str,
829
4
                        "Version conflict (Aborted), will retry"
830
                    );
831
4
                    last_err = Some(err);
832
4
                    continue;
833
1
                }
834
1
                warn!(
835
                    %operation_id,
836
                    update_type = %update_type_str,
837
                    ?err,
838
1
                    "inner_update_operation FAILED (non-retryable)"
839
                );
840
1
                return Err(err);
841
47
            }
842
843
            // Record execution metrics after successful state update
844
47
            let action_state = awaited_action.state();
845
47
            let instance_name = awaited_action
846
47
                .action_info()
847
47
                .unique_qualifier
848
47
                .instance_name()
849
47
                .as_str();
850
47
            let worker_id = awaited_action
851
47
                .worker_id()
852
47
                .map(std::string::ToString::to_string);
853
47
            let priority = Some(awaited_action.action_info().priority);
854
855
            // Build base attributes for metrics
856
47
            let mut attrs = nativelink_util::metrics::make_execution_attributes(
857
47
                instance_name,
858
47
                worker_id.as_deref(),
859
47
                priority,
860
            );
861
862
            // Add stage attribute
863
47
            let execution_stage: ExecutionStage = (&action_state.stage).into();
864
47
            attrs.push(KeyValue::new(EXECUTION_STAGE, execution_stage));
865
866
            // Record stage transition
867
47
            EXECUTION_METRICS.execution_stage_transitions.add(1, &attrs);
868
869
            // For completed actions, record the completion count with result
870
47
            match &action_state.stage {
871
7
                ActionStage::Completed(action_result) => {
872
7
                    let result = if action_result.exit_code == 0 {
  Branch (872:37): [True: 0, False: 0]
  Branch (872:37): [True: 0, False: 0]
  Branch (872:37): [Folded - Ignored]
  Branch (872:37): [True: 0, False: 1]
  Branch (872:37): [True: 0, False: 0]
  Branch (872:37): [True: 5, False: 1]
  Branch (872:37): [True: 0, False: 0]
  Branch (872:37): [True: 0, False: 0]
873
5
                        ExecutionResult::Success
874
                    } else {
875
2
                        ExecutionResult::Failure
876
                    };
877
7
                    attrs.push(KeyValue::new(EXECUTION_RESULT, result));
878
7
                    EXECUTION_METRICS.execution_completed_count.add(1, &attrs);
879
                }
880
0
                ActionStage::CompletedFromCache(_) => {
881
0
                    attrs.push(KeyValue::new(EXECUTION_RESULT, ExecutionResult::CacheHit));
882
0
                    EXECUTION_METRICS.execution_completed_count.add(1, &attrs);
883
0
                }
884
40
                _ => {}
885
            }
886
887
47
            debug!(
888
                %operation_id,
889
                retry_count,
890
                update_type = %update_type_str,
891
47
                "inner_update_operation SUCCESS"
892
            );
893
47
            return Ok(());
894
        }
895
896
0
        warn!(
897
            %operation_id,
898
            update_type = %update_type_str,
899
            retry_count = MAX_UPDATE_RETRIES,
900
0
            "inner_update_operation EXHAUSTED all retries"
901
        );
902
0
        Err(last_err.unwrap_or_else(|| {
903
0
            make_err!(
904
0
                Code::Internal,
905
                "Failed to update action after {} retries with no error set",
906
                MAX_UPDATE_RETRIES,
907
            )
908
0
        }))
909
54
    }
910
911
31
    async fn inner_add_operation(
912
31
        &self,
913
31
        new_client_operation_id: OperationId,
914
31
        action_info: Arc<ActionInfo>,
915
31
    ) -> Result<T::Subscriber, Error> {
916
31
        self.action_db
917
31
            .add_action(
918
31
                new_client_operation_id,
919
31
                action_info,
920
31
                self.no_event_action_timeout,
921
31
            )
922
31
            .await
923
31
            .err_tip(|| "In SimpleSchedulerStateManager::add_operation")
924
31
    }
925
926
620
    async fn inner_filter_operations<'a, F>(
927
620
        &'a self,
928
620
        filter: OperationFilter,
929
620
        to_action_state_result: F,
930
620
    ) -> Result<ActionStateResultStream<'a>, Error>
931
620
    where
932
620
        F: Fn(T::Subscriber) -> Box<dyn ActionStateResult> + Send + Sync + 'a,
933
620
    {
934
616
        const fn sorted_awaited_action_state_for_flags(
935
616
            stage: OperationStageFlags,
936
616
        ) -> Option<SortedAwaitedActionState> {
937
616
            match stage {
938
0
                OperationStageFlags::CacheCheck => Some(SortedAwaitedActionState::CacheCheck),
939
614
                OperationStageFlags::Queued => Some(SortedAwaitedActionState::Queued),
940
0
                OperationStageFlags::Executing => Some(SortedAwaitedActionState::Executing),
941
0
                OperationStageFlags::Completed => Some(SortedAwaitedActionState::Completed),
942
2
                _ => None,
943
            }
944
616
        }
945
946
620
        if let Some(
operation_id0
) = &filter.operation_id {
  Branch (946:16): [True: 0, False: 0]
  Branch (946:16): [True: 0, False: 0]
  Branch (946:16): [True: 0, False: 0]
  Branch (946:16): [True: 0, False: 0]
  Branch (946:16): [Folded - Ignored]
  Branch (946:16): [True: 0, False: 1]
  Branch (946:16): [True: 0, False: 17]
  Branch (946:16): [True: 0, False: 503]
  Branch (946:16): [True: 0, False: 95]
  Branch (946:16): [True: 0, False: 0]
  Branch (946:16): [True: 0, False: 0]
  Branch (946:16): [True: 0, False: 0]
  Branch (946:16): [True: 0, False: 4]
947
0
            let maybe_subscriber = self
948
0
                .action_db
949
0
                .get_by_operation_id(operation_id)
950
0
                .await
951
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
952
0
            let Some(subscriber) = maybe_subscriber else {
  Branch (952:17): [True: 0, False: 0]
  Branch (952:17): [True: 0, False: 0]
  Branch (952:17): [True: 0, False: 0]
  Branch (952:17): [True: 0, False: 0]
  Branch (952:17): [Folded - Ignored]
  Branch (952:17): [True: 0, False: 0]
  Branch (952:17): [True: 0, False: 0]
  Branch (952:17): [True: 0, False: 0]
  Branch (952:17): [True: 0, False: 0]
  Branch (952:17): [True: 0, False: 0]
  Branch (952:17): [True: 0, False: 0]
  Branch (952:17): [True: 0, False: 0]
  Branch (952:17): [True: 0, False: 0]
953
0
                return Ok(Box::pin(stream::empty()));
954
            };
955
0
            let awaited_action = subscriber
956
0
                .borrow()
957
0
                .await
958
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
959
0
            if !self
  Branch (959:16): [True: 0, False: 0]
  Branch (959:16): [True: 0, False: 0]
  Branch (959:16): [True: 0, False: 0]
  Branch (959:16): [True: 0, False: 0]
  Branch (959:16): [Folded - Ignored]
  Branch (959:16): [True: 0, False: 0]
  Branch (959:16): [True: 0, False: 0]
  Branch (959:16): [True: 0, False: 0]
  Branch (959:16): [True: 0, False: 0]
  Branch (959:16): [True: 0, False: 0]
  Branch (959:16): [True: 0, False: 0]
  Branch (959:16): [True: 0, False: 0]
  Branch (959:16): [True: 0, False: 0]
960
0
                .apply_filter_predicate(&awaited_action, &subscriber, &filter)
961
0
                .await
962
            {
963
0
                return Ok(Box::pin(stream::empty()));
964
0
            }
965
0
            return Ok(Box::pin(stream::once(async move {
966
0
                to_action_state_result(subscriber)
967
0
            })));
968
620
        }
969
620
        if let Some(
client_operation_id4
) = &filter.client_operation_id {
  Branch (969:16): [True: 0, False: 0]
  Branch (969:16): [True: 0, False: 0]
  Branch (969:16): [True: 0, False: 0]
  Branch (969:16): [True: 0, False: 0]
  Branch (969:16): [Folded - Ignored]
  Branch (969:16): [True: 1, False: 0]
  Branch (969:16): [True: 0, False: 17]
  Branch (969:16): [True: 3, False: 500]
  Branch (969:16): [True: 0, False: 95]
  Branch (969:16): [True: 0, False: 0]
  Branch (969:16): [True: 0, False: 0]
  Branch (969:16): [True: 0, False: 0]
  Branch (969:16): [True: 0, False: 4]
970
4
            let maybe_subscriber = self
971
4
                .action_db
972
4
                .get_awaited_action_by_id(client_operation_id)
973
4
                .await
974
4
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
;
975
4
            let Some(subscriber) = maybe_subscriber else {
  Branch (975:17): [True: 0, False: 0]
  Branch (975:17): [True: 0, False: 0]
  Branch (975:17): [True: 0, False: 0]
  Branch (975:17): [True: 0, False: 0]
  Branch (975:17): [Folded - Ignored]
  Branch (975:17): [True: 1, False: 0]
  Branch (975:17): [True: 0, False: 0]
  Branch (975:17): [True: 3, False: 0]
  Branch (975:17): [True: 0, False: 0]
  Branch (975:17): [True: 0, False: 0]
  Branch (975:17): [True: 0, False: 0]
  Branch (975:17): [True: 0, False: 0]
  Branch (975:17): [True: 0, False: 0]
976
0
                return Ok(Box::pin(stream::empty()));
977
            };
978
4
            let awaited_action = subscriber
979
4
                .borrow()
980
4
                .await
981
4
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
;
982
4
            if !self
  Branch (982:16): [True: 0, False: 0]
  Branch (982:16): [True: 0, False: 0]
  Branch (982:16): [True: 0, False: 0]
  Branch (982:16): [True: 0, False: 0]
  Branch (982:16): [Folded - Ignored]
  Branch (982:16): [True: 0, False: 1]
  Branch (982:16): [True: 0, False: 0]
  Branch (982:16): [True: 0, False: 3]
  Branch (982:16): [True: 0, False: 0]
  Branch (982:16): [True: 0, False: 0]
  Branch (982:16): [True: 0, False: 0]
  Branch (982:16): [True: 0, False: 0]
  Branch (982:16): [True: 0, False: 0]
983
4
                .apply_filter_predicate(&awaited_action, &subscriber, &filter)
984
4
                .await
985
            {
986
0
                return Ok(Box::pin(stream::empty()));
987
4
            }
988
4
            return Ok(Box::pin(stream::once(async move {
989
4
                to_action_state_result(subscriber)
990
4
            })));
991
616
        }
992
993
614
        let Some(sorted_awaited_action_state) =
  Branch (993:13): [True: 0, False: 0]
  Branch (993:13): [True: 0, False: 0]
  Branch (993:13): [True: 0, False: 0]
  Branch (993:13): [True: 0, False: 0]
  Branch (993:13): [Folded - Ignored]
  Branch (993:13): [True: 0, False: 0]
  Branch (993:13): [True: 17, False: 0]
  Branch (993:13): [True: 500, False: 0]
  Branch (993:13): [True: 93, False: 2]
  Branch (993:13): [True: 0, False: 0]
  Branch (993:13): [True: 0, False: 0]
  Branch (993:13): [True: 0, False: 0]
  Branch (993:13): [True: 4, False: 0]
994
616
            sorted_awaited_action_state_for_flags(filter.stages)
995
        else {
996
2
            let mut all_items: Vec<_> = self
997
2
                .action_db
998
2
                .get_all_awaited_actions()
999
2
                .await
1000
2
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
1001
2
                .and_then(|awaited_action_subscriber| async move 
{0
1002
0
                    let awaited_action = awaited_action_subscriber
1003
0
                        .borrow()
1004
0
                        .await
1005
0
                        .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
1006
0
                    Ok((awaited_action_subscriber, awaited_action))
1007
0
                })
1008
2
                .try_filter_map(|(subscriber, awaited_action)| 
{0
1009
0
                    let filter = filter.clone();
1010
0
                    async move {
1011
0
                        Ok(self
1012
0
                            .apply_filter_predicate(&awaited_action, &subscriber, &filter)
1013
0
                            .await
1014
0
                            .then_some((subscriber, awaited_action.sort_key())))
1015
0
                    }
1016
0
                })
1017
2
                .try_collect()
1018
2
                .await
1019
2
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
;
1020
0
            match filter.order_by_priority_direction {
1021
0
                Some(OrderDirection::Asc) => all_items.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)),
1022
0
                Some(OrderDirection::Desc) => all_items.sort_unstable_by(|(_, a), (_, b)| b.cmp(a)),
1023
2
                None => {}
1024
            }
1025
2
            return Ok(Box::pin(stream::iter(
1026
2
                all_items
1027
2
                    .into_iter()
1028
2
                    .map(move |(subscriber, _)| 
to_action_state_result0
(
subscriber0
)),
1029
            )));
1030
        };
1031
1032
614
        let desc = 
matches!500
(
1033
114
            filter.order_by_priority_direction,
1034
            Some(OrderDirection::Desc)
1035
        );
1036
614
        let stream = self
1037
614
            .action_db
1038
614
            .get_range_of_actions(
1039
614
                sorted_awaited_action_state,
1040
614
                Bound::Unbounded,
1041
614
                Bound::Unbounded,
1042
614
                desc,
1043
614
            )
1044
614
            .await
1045
614
            .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
1046
614
            .and_then(|awaited_action_subscriber| async move 
{555
1047
555
                let awaited_action = awaited_action_subscriber
1048
555
                    .borrow()
1049
555
                    .await
1050
555
                    .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
;
1051
555
                Ok((awaited_action_subscriber, awaited_action))
1052
1.11k
            })
1053
614
            .try_filter_map(move |(subscriber, awaited_action)| 
{555
1054
555
                let filter = filter.clone();
1055
555
                async move {
1056
555
                    Ok(self
1057
555
                        .apply_filter_predicate(&awaited_action, &subscriber, &filter)
1058
555
                        .await
1059
555
                        .then_some(subscriber))
1060
555
                }
1061
555
            })
1062
614
            .map(move |result| -> Box<dyn ActionStateResult> 
{555
1063
555
                result.map_or_else(
1064
0
                    |e| -> Box<dyn ActionStateResult> { Box::new(ErrorActionStateResult(e)) },
1065
555
                    |v| -> Box<dyn ActionStateResult> { to_action_state_result(v) },
1066
                )
1067
555
            });
1068
614
        Ok(Box::pin(stream))
1069
620
    }
1070
}
1071
1072
#[async_trait]
1073
impl<T, I, NowFn> ClientStateManager for SimpleSchedulerStateManager<T, I, NowFn>
1074
where
1075
    T: AwaitedActionDb,
1076
    I: InstantWrapper,
1077
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
1078
{
1079
    async fn add_action(
1080
        &self,
1081
        client_operation_id: OperationId,
1082
        action_info: Arc<ActionInfo>,
1083
31
    ) -> Result<Box<dyn ActionStateResult>, Error> {
1084
        let sub = self
1085
            .inner_add_operation(client_operation_id, action_info.clone())
1086
            .await?;
1087
1088
        Ok(Box::new(ClientActionStateResult::new(
1089
            sub,
1090
            self.weak_self.clone(),
1091
            self.no_event_action_timeout,
1092
            self.now_fn.clone(),
1093
        )))
1094
31
    }
1095
1096
    async fn filter_operations<'a>(
1097
        &'a self,
1098
        filter: OperationFilter,
1099
504
    ) -> Result<ActionStateResultStream<'a>, Error> {
1100
504
        self.inner_filter_operations(filter, move |rx| {
1101
504
            Box::new(ClientActionStateResult::new(
1102
504
                rx,
1103
504
                self.weak_self.clone(),
1104
504
                self.no_event_action_timeout,
1105
504
                self.now_fn.clone(),
1106
504
            ))
1107
504
        })
1108
        .await
1109
504
    }
1110
1111
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
1112
0
        None
1113
0
    }
1114
}
1115
1116
#[async_trait]
1117
impl<T, I, NowFn> WorkerStateManager for SimpleSchedulerStateManager<T, I, NowFn>
1118
where
1119
    T: AwaitedActionDb,
1120
    I: InstantWrapper,
1121
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
1122
{
1123
    async fn update_operation(
1124
        &self,
1125
        operation_id: &OperationId,
1126
        worker_id: &WorkerId,
1127
        update: UpdateOperationType,
1128
17
    ) -> Result<(), Error> {
1129
        self.inner_update_operation(operation_id, Some(worker_id), update)
1130
            .await
1131
17
    }
1132
}
1133
1134
#[async_trait]
1135
impl<T, I, NowFn> MatchingEngineStateManager for SimpleSchedulerStateManager<T, I, NowFn>
1136
where
1137
    T: AwaitedActionDb,
1138
    I: InstantWrapper,
1139
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
1140
{
1141
    async fn filter_operations<'a>(
1142
        &'a self,
1143
        filter: OperationFilter,
1144
116
    ) -> Result<ActionStateResultStream<'a>, Error> {
1145
55
        self.inner_filter_operations(filter, |rx| {
1146
55
            Box::new(MatchingEngineActionStateResult::new(
1147
55
                rx,
1148
55
                self.weak_self.clone(),
1149
55
                self.no_event_action_timeout,
1150
55
                self.now_fn.clone(),
1151
55
            ))
1152
55
        })
1153
        .await
1154
116
    }
1155
1156
    async fn assign_operation(
1157
        &self,
1158
        operation_id: &OperationId,
1159
        worker_id_or_reason_for_unassign: Result<&WorkerId, Error>,
1160
37
    ) -> Result<(), Error> {
1161
        let (maybe_worker_id, update) = match worker_id_or_reason_for_unassign {
1162
            Ok(worker_id) => (
1163
                Some(worker_id),
1164
                UpdateOperationType::UpdateWithActionStage(ActionStage::Executing),
1165
            ),
1166
            Err(err) => (None, UpdateOperationType::UpdateWithError(err)),
1167
        };
1168
        self.inner_update_operation(operation_id, maybe_worker_id, update)
1169
            .await
1170
37
    }
1171
}