Coverage Report

Created: 2025-12-16 15:31

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/simple_scheduler_state_manager.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::ops::Bound;
16
use core::time::Duration;
17
use std::string::ToString;
18
use std::sync::{Arc, Weak};
19
20
use async_lock::Mutex;
21
use async_trait::async_trait;
22
use futures::{StreamExt, TryStreamExt, stream};
23
use nativelink_error::{Code, Error, ResultExt, make_err};
24
use nativelink_metric::MetricsComponent;
25
use nativelink_util::action_messages::{
26
    ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueQualifier, ExecutionMetadata,
27
    OperationId, WorkerId,
28
};
29
use nativelink_util::instant_wrapper::InstantWrapper;
30
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
31
use nativelink_util::metrics::{
32
    EXECUTION_METRICS, EXECUTION_RESULT, EXECUTION_STAGE, ExecutionResult, ExecutionStage,
33
};
34
use nativelink_util::operation_state_manager::{
35
    ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
36
    OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType, WorkerStateManager,
37
};
38
use nativelink_util::origin_event::OriginMetadata;
39
use opentelemetry::KeyValue;
40
use tracing::{debug, info, trace, warn};
41
42
use super::awaited_action_db::{
43
    AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, SortedAwaitedActionState,
44
};
45
use crate::worker_registry::SharedWorkerRegistry;
46
47
/// Maximum number of times an update to the database
48
/// can fail before giving up.
49
const MAX_UPDATE_RETRIES: usize = 5;
50
51
/// Base delay for exponential backoff on version conflicts (in ms).
52
const BASE_RETRY_DELAY_MS: u64 = 10;
53
54
/// Maximum jitter to add to retry delay (in ms).
55
const MAX_RETRY_JITTER_MS: u64 = 20;
56
57
/// Simple struct that implements the `ActionStateResult` trait and always returns an error.
58
struct ErrorActionStateResult(Error);
59
60
#[async_trait]
61
impl ActionStateResult for ErrorActionStateResult {
62
0
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
63
        Err(self.0.clone())
64
0
    }
65
66
0
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
67
        Err(self.0.clone())
68
0
    }
69
70
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
71
        Err(self.0.clone())
72
0
    }
73
}
74
75
struct ClientActionStateResult<U, T, I, NowFn>
76
where
77
    U: AwaitedActionSubscriber,
78
    T: AwaitedActionDb,
79
    I: InstantWrapper,
80
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
81
{
82
    inner: MatchingEngineActionStateResult<U, T, I, NowFn>,
83
}
84
85
impl<U, T, I, NowFn> ClientActionStateResult<U, T, I, NowFn>
86
where
87
    U: AwaitedActionSubscriber,
88
    T: AwaitedActionDb,
89
    I: InstantWrapper,
90
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
91
{
92
534
    const fn new(
93
534
        sub: U,
94
534
        simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
95
534
        no_event_action_timeout: Duration,
96
534
        now_fn: NowFn,
97
534
    ) -> Self {
98
534
        Self {
99
534
            inner: MatchingEngineActionStateResult::new(
100
534
                sub,
101
534
                simple_scheduler_state_manager,
102
534
                no_event_action_timeout,
103
534
                now_fn,
104
534
            ),
105
534
        }
106
534
    }
107
}
108
109
#[async_trait]
110
impl<U, T, I, NowFn> ActionStateResult for ClientActionStateResult<U, T, I, NowFn>
111
where
112
    U: AwaitedActionSubscriber,
113
    T: AwaitedActionDb,
114
    I: InstantWrapper,
115
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
116
{
117
6
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
118
        self.inner.as_state().await
119
6
    }
120
121
50
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
122
        self.inner.changed().await
123
50
    }
124
125
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
126
        self.inner.as_action_info().await
127
0
    }
128
}
129
130
struct MatchingEngineActionStateResult<U, T, I, NowFn>
131
where
132
    U: AwaitedActionSubscriber,
133
    T: AwaitedActionDb,
134
    I: InstantWrapper,
135
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
136
{
137
    awaited_action_sub: U,
138
    simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
139
    no_event_action_timeout: Duration,
140
    now_fn: NowFn,
141
}
142
impl<U, T, I, NowFn> MatchingEngineActionStateResult<U, T, I, NowFn>
143
where
144
    U: AwaitedActionSubscriber,
145
    T: AwaitedActionDb,
146
    I: InstantWrapper,
147
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
148
{
149
584
    const fn new(
150
584
        awaited_action_sub: U,
151
584
        simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
152
584
        no_event_action_timeout: Duration,
153
584
        now_fn: NowFn,
154
584
    ) -> Self {
155
584
        Self {
156
584
            awaited_action_sub,
157
584
            simple_scheduler_state_manager,
158
584
            no_event_action_timeout,
159
584
            now_fn,
160
584
        }
161
584
    }
162
}
163
164
#[async_trait]
165
impl<U, T, I, NowFn> ActionStateResult for MatchingEngineActionStateResult<U, T, I, NowFn>
166
where
167
    U: AwaitedActionSubscriber,
168
    T: AwaitedActionDb,
169
    I: InstantWrapper,
170
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
171
{
172
38
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
173
        let awaited_action = self
174
            .awaited_action_sub
175
            .borrow()
176
            .await
177
            .err_tip(|| "In MatchingEngineActionStateResult::as_state")?;
178
        Ok((
179
            awaited_action.state().clone(),
180
            awaited_action.maybe_origin_metadata().cloned(),
181
        ))
182
38
    }
183
184
50
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
185
        let mut timeout_attempts = 0;
186
        loop {
187
            tokio::select! {
188
                awaited_action_result = self.awaited_action_sub.changed() => {
189
                    return awaited_action_result
190
                        .err_tip(|| "In MatchingEngineActionStateResult::changed")
191
46
                        .map(|v| (v.state().clone(), v.maybe_origin_metadata().cloned()));
192
                }
193
                () = (self.now_fn)().sleep(self.no_event_action_timeout) => {
194
                    // Timeout happened, do additional checks below.
195
                }
196
            }
197
198
            let awaited_action = self
199
                .awaited_action_sub
200
                .borrow()
201
                .await
202
                .err_tip(|| "In MatchingEngineActionStateResult::changed")?;
203
204
            if matches!(awaited_action.state().stage, ActionStage::Queued) {
205
                // Actions in queued state do not get periodically updated,
206
                // so we don't need to timeout them.
207
                continue;
208
            }
209
210
            let simple_scheduler_state_manager = self
211
                .simple_scheduler_state_manager
212
                .upgrade()
213
0
                .err_tip(|| format!("Failed to upgrade weak reference to SimpleSchedulerStateManager in MatchingEngineActionStateResult::changed at attempt: {timeout_attempts}"))?;
214
215
            // Check if worker is alive via registry before timing out.
216
            let should_timeout = simple_scheduler_state_manager
217
                .should_timeout_operation(&awaited_action)
218
                .await;
219
220
            if !should_timeout {
221
                // Worker is alive, continue waiting for updates
222
                trace!(
223
                    operation_id = %awaited_action.operation_id(),
224
                    "Operation timeout check passed, worker is alive"
225
                );
226
                continue;
227
            }
228
229
            warn!(
230
                ?awaited_action,
231
                "OperationId {} / {} timed out after {} seconds issuing a retry",
232
                awaited_action.operation_id(),
233
                awaited_action.state().client_operation_id,
234
                self.no_event_action_timeout.as_secs_f32(),
235
            );
236
237
            simple_scheduler_state_manager
238
                .timeout_operation_id(awaited_action.operation_id())
239
                .await
240
                .err_tip(|| "In MatchingEngineActionStateResult::changed")?;
241
242
            if timeout_attempts >= MAX_UPDATE_RETRIES {
243
                return Err(make_err!(
244
                    Code::Internal,
245
                    "Failed to update action after {} retries with no error set in MatchingEngineActionStateResult::changed - {} {:?}",
246
                    MAX_UPDATE_RETRIES,
247
                    awaited_action.operation_id(),
248
                    awaited_action.state().stage,
249
                ));
250
            }
251
            timeout_attempts += 1;
252
        }
253
50
    }
254
255
50
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
256
        let awaited_action = self
257
            .awaited_action_sub
258
            .borrow()
259
            .await
260
            .err_tip(|| "In MatchingEngineActionStateResult::as_action_info")?;
261
        Ok((
262
            awaited_action.action_info().clone(),
263
            awaited_action.maybe_origin_metadata().cloned(),
264
        ))
265
50
    }
266
}
267
268
/// `SimpleSchedulerStateManager` is responsible for maintaining the state of the scheduler.
269
/// Scheduler state includes the actions that are queued, active, and recently completed.
270
/// It also includes the workers that are available to execute actions based on allocation
271
/// strategy.
272
#[derive(MetricsComponent, Debug)]
273
pub struct SimpleSchedulerStateManager<T, I, NowFn>
274
where
275
    T: AwaitedActionDb,
276
    I: InstantWrapper,
277
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
278
{
279
    /// Database for storing the state of all actions.
280
    #[metric(group = "action_db")]
281
    action_db: T,
282
283
    /// Maximum number of times a job can be retried.
284
    // TODO(palfrey) This should be a scheduler decorator instead
285
    // of always having it on every SimpleScheduler.
286
    #[metric(help = "Maximum number of times a job can be retried")]
287
    max_job_retries: usize,
288
289
    /// Duration after which an action is considered to be timed out if
290
    /// no event is received.
291
    #[metric(
292
        help = "Duration after which an action is considered to be timed out if no event is received"
293
    )]
294
    no_event_action_timeout: Duration,
295
296
    /// Mark operation as timed out if the worker has not updated in this duration.
297
    /// This is used to prevent operations from being stuck in the queue forever
298
    /// if it is not being processed by any worker.
299
    client_action_timeout: Duration,
300
301
    // A lock to ensure only one timeout operation is running at a time
302
    // on this service.
303
    timeout_operation_mux: Mutex<()>,
304
305
    /// Weak reference to self.
306
    // We use a weak reference to reduce the risk of a memory leak from
307
    // future changes. If this becomes some kind of performance issue,
308
    // we can consider using a strong reference.
309
    weak_self: Weak<Self>,
310
311
    /// Function to get the current time.
312
    now_fn: NowFn,
313
314
    /// Worker registry for checking worker liveness.
315
    worker_registry: Option<SharedWorkerRegistry>,
316
}
317
318
impl<T, I, NowFn> SimpleSchedulerStateManager<T, I, NowFn>
319
where
320
    T: AwaitedActionDb,
321
    I: InstantWrapper,
322
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
323
{
324
25
    pub fn new(
325
25
        max_job_retries: usize,
326
25
        no_event_action_timeout: Duration,
327
25
        client_action_timeout: Duration,
328
25
        action_db: T,
329
25
        now_fn: NowFn,
330
25
        worker_registry: Option<SharedWorkerRegistry>,
331
25
    ) -> Arc<Self> {
332
25
        Arc::new_cyclic(|weak_self| Self {
333
25
            action_db,
334
25
            max_job_retries,
335
25
            no_event_action_timeout,
336
25
            client_action_timeout,
337
25
            timeout_operation_mux: Mutex::new(()),
338
25
            weak_self: weak_self.clone(),
339
25
            now_fn,
340
25
            worker_registry,
341
25
        })
342
25
    }
343
344
1
    pub async fn should_timeout_operation(&self, awaited_action: &AwaitedAction) -> bool {
345
1
        if !
matches!0
(awaited_action.state().stage, ActionStage::Executing) {
  Branch (345:12): [True: 0, False: 0]
  Branch (345:12): [True: 0, False: 0]
  Branch (345:12): [Folded - Ignored]
  Branch (345:12): [True: 0, False: 0]
  Branch (345:12): [True: 0, False: 1]
  Branch (345:12): [True: 0, False: 0]
  Branch (345:12): [True: 0, False: 0]
346
0
            return false;
347
1
        }
348
349
1
        let now = (self.now_fn)().now();
350
351
1
        let registry_alive = if let Some(ref worker_registry) = self.worker_registry {
  Branch (351:37): [True: 0, False: 0]
  Branch (351:37): [True: 0, False: 0]
  Branch (351:37): [Folded - Ignored]
  Branch (351:37): [True: 0, False: 0]
  Branch (351:37): [True: 1, False: 0]
  Branch (351:37): [True: 0, False: 0]
  Branch (351:37): [True: 0, False: 0]
352
1
            if let Some(worker_id) = awaited_action.worker_id() {
  Branch (352:20): [True: 0, False: 0]
  Branch (352:20): [True: 0, False: 0]
  Branch (352:20): [Folded - Ignored]
  Branch (352:20): [True: 0, False: 0]
  Branch (352:20): [True: 1, False: 0]
  Branch (352:20): [True: 0, False: 0]
  Branch (352:20): [True: 0, False: 0]
353
1
                worker_registry
354
1
                    .is_worker_alive(worker_id, self.no_event_action_timeout, now)
355
1
                    .await
356
            } else {
357
0
                false
358
            }
359
        } else {
360
0
            false
361
        };
362
363
1
        if registry_alive {
  Branch (363:12): [True: 0, False: 0]
  Branch (363:12): [True: 0, False: 0]
  Branch (363:12): [Folded - Ignored]
  Branch (363:12): [True: 0, False: 0]
  Branch (363:12): [True: 0, False: 1]
  Branch (363:12): [True: 0, False: 0]
  Branch (363:12): [True: 0, False: 0]
364
0
            return false;
365
1
        }
366
367
1
        let worker_should_update_before = awaited_action
368
1
            .last_worker_updated_timestamp()
369
1
            .checked_add(self.no_event_action_timeout)
370
1
            .unwrap_or(now);
371
372
1
        if worker_should_update_before >= now {
  Branch (372:12): [True: 0, False: 0]
  Branch (372:12): [True: 0, False: 0]
  Branch (372:12): [Folded - Ignored]
  Branch (372:12): [True: 0, False: 0]
  Branch (372:12): [True: 0, False: 1]
  Branch (372:12): [True: 0, False: 0]
  Branch (372:12): [True: 0, False: 0]
373
0
            return false;
374
1
        }
375
376
1
        true
377
1
    }
378
379
554
    async fn apply_filter_predicate(
380
554
        &self,
381
554
        awaited_action: &AwaitedAction,
382
554
        subscriber: &T::Subscriber,
383
554
        filter: &OperationFilter,
384
554
    ) -> bool {
385
        // Note: The caller must filter `client_operation_id`.
386
387
554
        let mut maybe_reloaded_awaited_action: Option<AwaitedAction> = None;
388
554
        let now = (self.now_fn)().now();
389
390
        // Check if client has timed out
391
554
        if awaited_action.last_client_keepalive_timestamp() + self.client_action_timeout < now {
  Branch (391:12): [True: 0, False: 0]
  Branch (391:12): [True: 0, False: 0]
  Branch (391:12): [Folded - Ignored]
  Branch (391:12): [True: 0, False: 6]
  Branch (391:12): [True: 0, False: 546]
  Branch (391:12): [True: 0, False: 0]
  Branch (391:12): [True: 0, False: 2]
392
            // This may change if the version is out of date.
393
0
            let mut timed_out = true;
394
0
            if !awaited_action.state().stage.is_finished() {
  Branch (394:16): [True: 0, False: 0]
  Branch (394:16): [True: 0, False: 0]
  Branch (394:16): [Folded - Ignored]
  Branch (394:16): [True: 0, False: 0]
  Branch (394:16): [True: 0, False: 0]
  Branch (394:16): [True: 0, False: 0]
  Branch (394:16): [True: 0, False: 0]
395
0
                let mut state = awaited_action.state().as_ref().clone();
396
0
                state.stage = ActionStage::Completed(ActionResult {
397
0
                    error: Some(make_err!(
398
0
                        Code::DeadlineExceeded,
399
0
                        "Operation timed out {} seconds of having no more clients listening",
400
0
                        self.client_action_timeout.as_secs_f32(),
401
0
                    )),
402
0
                    ..ActionResult::default()
403
0
                });
404
0
                state.last_transition_timestamp = now;
405
0
                let state = Arc::new(state);
406
                // We may be competing with an client timestamp update, so try
407
                // this a few times.
408
0
                for attempt in 1..=MAX_UPDATE_RETRIES {
409
0
                    let mut new_awaited_action = match &maybe_reloaded_awaited_action {
410
0
                        None => awaited_action.clone(),
411
0
                        Some(reloaded_awaited_action) => reloaded_awaited_action.clone(),
412
                    };
413
0
                    new_awaited_action.worker_set_state(state.clone(), (self.now_fn)().now());
414
0
                    let err = match self
415
0
                        .action_db
416
0
                        .update_awaited_action(new_awaited_action)
417
0
                        .await
418
                    {
419
0
                        Ok(()) => break,
420
0
                        Err(err) => err,
421
                    };
422
                    // Reload from the database if the action was outdated.
423
0
                    let maybe_awaited_action =
424
0
                        if attempt == MAX_UPDATE_RETRIES || err.code != Code::Aborted {
  Branch (424:28): [True: 0, False: 0]
  Branch (424:61): [True: 0, False: 0]
  Branch (424:28): [True: 0, False: 0]
  Branch (424:61): [True: 0, False: 0]
  Branch (424:28): [Folded - Ignored]
  Branch (424:61): [Folded - Ignored]
  Branch (424:28): [True: 0, False: 0]
  Branch (424:61): [True: 0, False: 0]
  Branch (424:28): [True: 0, False: 0]
  Branch (424:61): [True: 0, False: 0]
  Branch (424:28): [True: 0, False: 0]
  Branch (424:61): [True: 0, False: 0]
  Branch (424:28): [True: 0, False: 0]
  Branch (424:61): [True: 0, False: 0]
425
0
                            None
426
                        } else {
427
0
                            subscriber.borrow().await.ok()
428
                        };
429
0
                    if let Some(reloaded_awaited_action) = maybe_awaited_action {
  Branch (429:28): [True: 0, False: 0]
  Branch (429:28): [True: 0, False: 0]
  Branch (429:28): [Folded - Ignored]
  Branch (429:28): [True: 0, False: 0]
  Branch (429:28): [True: 0, False: 0]
  Branch (429:28): [True: 0, False: 0]
  Branch (429:28): [True: 0, False: 0]
430
0
                        maybe_reloaded_awaited_action = Some(reloaded_awaited_action);
431
0
                    } else {
432
0
                        warn!(
433
0
                            "Failed to update action to timed out state after client keepalive timeout. This is ok if multiple schedulers tried to set the state at the same time: {err}",
434
                        );
435
0
                        break;
436
                    }
437
                    // Re-check the predicate after reload.
438
0
                    if maybe_reloaded_awaited_action
  Branch (438:24): [True: 0, False: 0]
  Branch (438:24): [True: 0, False: 0]
  Branch (438:24): [Folded - Ignored]
  Branch (438:24): [True: 0, False: 0]
  Branch (438:24): [True: 0, False: 0]
  Branch (438:24): [True: 0, False: 0]
  Branch (438:24): [True: 0, False: 0]
439
0
                        .as_ref()
440
0
                        .is_some_and(|awaited_action| {
441
0
                            awaited_action.last_client_keepalive_timestamp()
442
0
                                + self.client_action_timeout
443
0
                                >= (self.now_fn)().now()
444
0
                        })
445
                    {
446
0
                        timed_out = false;
447
0
                        break;
448
0
                    } else if maybe_reloaded_awaited_action
  Branch (448:31): [True: 0, False: 0]
  Branch (448:31): [True: 0, False: 0]
  Branch (448:31): [Folded - Ignored]
  Branch (448:31): [True: 0, False: 0]
  Branch (448:31): [True: 0, False: 0]
  Branch (448:31): [True: 0, False: 0]
  Branch (448:31): [True: 0, False: 0]
449
0
                        .as_ref()
450
0
                        .is_some_and(|awaited_action| awaited_action.state().stage.is_finished())
451
                    {
452
0
                        break;
453
0
                    }
454
                }
455
0
            }
456
0
            if timed_out {
  Branch (456:16): [True: 0, False: 0]
  Branch (456:16): [True: 0, False: 0]
  Branch (456:16): [Folded - Ignored]
  Branch (456:16): [True: 0, False: 0]
  Branch (456:16): [True: 0, False: 0]
  Branch (456:16): [True: 0, False: 0]
  Branch (456:16): [True: 0, False: 0]
457
0
                return false;
458
0
            }
459
554
        }
460
        // If the action was reloaded, then use that for the rest of the checks
461
        // instead of the input parameter.
462
554
        let awaited_action = maybe_reloaded_awaited_action
463
554
            .as_ref()
464
554
            .unwrap_or(awaited_action);
465
466
554
        if let Some(
operation_id0
) = &filter.operation_id {
  Branch (466:16): [True: 0, False: 0]
  Branch (466:16): [True: 0, False: 0]
  Branch (466:16): [Folded - Ignored]
  Branch (466:16): [True: 0, False: 6]
  Branch (466:16): [True: 0, False: 546]
  Branch (466:16): [True: 0, False: 0]
  Branch (466:16): [True: 0, False: 2]
467
0
            if operation_id != awaited_action.operation_id() {
  Branch (467:16): [True: 0, False: 0]
  Branch (467:16): [True: 0, False: 0]
  Branch (467:16): [Folded - Ignored]
  Branch (467:16): [True: 0, False: 0]
  Branch (467:16): [True: 0, False: 0]
  Branch (467:16): [True: 0, False: 0]
  Branch (467:16): [True: 0, False: 0]
468
0
                return false;
469
0
            }
470
554
        }
471
472
554
        if filter.worker_id.is_some() && 
filter.worker_id.as_ref()0
!= awaited_action.worker_id() {
  Branch (472:12): [True: 0, False: 0]
  Branch (472:42): [True: 0, False: 0]
  Branch (472:12): [True: 0, False: 0]
  Branch (472:42): [True: 0, False: 0]
  Branch (472:12): [Folded - Ignored]
  Branch (472:42): [Folded - Ignored]
  Branch (472:12): [True: 0, False: 6]
  Branch (472:42): [True: 0, False: 0]
  Branch (472:12): [True: 0, False: 546]
  Branch (472:42): [True: 0, False: 0]
  Branch (472:12): [True: 0, False: 0]
  Branch (472:42): [True: 0, False: 0]
  Branch (472:12): [True: 0, False: 2]
  Branch (472:42): [True: 0, False: 0]
473
0
            return false;
474
554
        }
475
476
        {
477
554
            if let Some(
filter_unique_key0
) = &filter.unique_key {
  Branch (477:20): [True: 0, False: 0]
  Branch (477:20): [True: 0, False: 0]
  Branch (477:20): [Folded - Ignored]
  Branch (477:20): [True: 0, False: 6]
  Branch (477:20): [True: 0, False: 546]
  Branch (477:20): [True: 0, False: 0]
  Branch (477:20): [True: 0, False: 2]
478
0
                match &awaited_action.action_info().unique_qualifier {
479
0
                    ActionUniqueQualifier::Cacheable(unique_key) => {
480
0
                        if filter_unique_key != unique_key {
  Branch (480:28): [True: 0, False: 0]
  Branch (480:28): [True: 0, False: 0]
  Branch (480:28): [Folded - Ignored]
  Branch (480:28): [True: 0, False: 0]
  Branch (480:28): [True: 0, False: 0]
  Branch (480:28): [True: 0, False: 0]
  Branch (480:28): [True: 0, False: 0]
481
0
                            return false;
482
0
                        }
483
                    }
484
                    ActionUniqueQualifier::Uncacheable(_) => {
485
0
                        return false;
486
                    }
487
                }
488
554
            }
489
554
            if let Some(
action_digest0
) = filter.action_digest {
  Branch (489:20): [True: 0, False: 0]
  Branch (489:20): [True: 0, False: 0]
  Branch (489:20): [Folded - Ignored]
  Branch (489:20): [True: 0, False: 6]
  Branch (489:20): [True: 0, False: 546]
  Branch (489:20): [True: 0, False: 0]
  Branch (489:20): [True: 0, False: 2]
490
0
                if action_digest != awaited_action.action_info().digest() {
  Branch (490:20): [True: 0, False: 0]
  Branch (490:20): [True: 0, False: 0]
  Branch (490:20): [Folded - Ignored]
  Branch (490:20): [True: 0, False: 0]
  Branch (490:20): [True: 0, False: 0]
  Branch (490:20): [True: 0, False: 0]
  Branch (490:20): [True: 0, False: 0]
491
0
                    return false;
492
0
                }
493
554
            }
494
        }
495
496
        {
497
554
            let last_worker_update_timestamp = awaited_action.last_worker_updated_timestamp();
498
554
            if let Some(
worker_update_before0
) = filter.worker_update_before {
  Branch (498:20): [True: 0, False: 0]
  Branch (498:20): [True: 0, False: 0]
  Branch (498:20): [Folded - Ignored]
  Branch (498:20): [True: 0, False: 6]
  Branch (498:20): [True: 0, False: 546]
  Branch (498:20): [True: 0, False: 0]
  Branch (498:20): [True: 0, False: 2]
499
0
                if worker_update_before < last_worker_update_timestamp {
  Branch (499:20): [True: 0, False: 0]
  Branch (499:20): [True: 0, False: 0]
  Branch (499:20): [Folded - Ignored]
  Branch (499:20): [True: 0, False: 0]
  Branch (499:20): [True: 0, False: 0]
  Branch (499:20): [True: 0, False: 0]
  Branch (499:20): [True: 0, False: 0]
500
0
                    return false;
501
0
                }
502
554
            }
503
554
            if let Some(
completed_before0
) = filter.completed_before {
  Branch (503:20): [True: 0, False: 0]
  Branch (503:20): [True: 0, False: 0]
  Branch (503:20): [Folded - Ignored]
  Branch (503:20): [True: 0, False: 6]
  Branch (503:20): [True: 0, False: 546]
  Branch (503:20): [True: 0, False: 0]
  Branch (503:20): [True: 0, False: 2]
504
0
                if awaited_action.state().stage.is_finished()
  Branch (504:20): [True: 0, False: 0]
  Branch (504:20): [True: 0, False: 0]
  Branch (504:20): [Folded - Ignored]
  Branch (504:20): [True: 0, False: 0]
  Branch (504:20): [True: 0, False: 0]
  Branch (504:20): [True: 0, False: 0]
  Branch (504:20): [True: 0, False: 0]
505
0
                    && completed_before < last_worker_update_timestamp
  Branch (505:24): [True: 0, False: 0]
  Branch (505:24): [True: 0, False: 0]
  Branch (505:24): [Folded - Ignored]
  Branch (505:24): [True: 0, False: 0]
  Branch (505:24): [True: 0, False: 0]
  Branch (505:24): [True: 0, False: 0]
  Branch (505:24): [True: 0, False: 0]
506
                {
507
0
                    return false;
508
0
                }
509
554
            }
510
554
            if filter.stages != OperationStageFlags::Any {
  Branch (510:16): [True: 0, False: 0]
  Branch (510:16): [True: 0, False: 0]
  Branch (510:16): [Folded - Ignored]
  Branch (510:16): [True: 5, False: 1]
  Branch (510:16): [True: 543, False: 3]
  Branch (510:16): [True: 0, False: 0]
  Branch (510:16): [True: 2, False: 0]
511
550
                let stage_flag = match awaited_action.state().stage {
512
0
                    ActionStage::Unknown => OperationStageFlags::Any,
513
0
                    ActionStage::CacheCheck => OperationStageFlags::CacheCheck,
514
550
                    ActionStage::Queued => OperationStageFlags::Queued,
515
0
                    ActionStage::Executing => OperationStageFlags::Executing,
516
                    ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => {
517
0
                        OperationStageFlags::Completed
518
                    }
519
                };
520
550
                if !filter.stages.intersects(stage_flag) {
  Branch (520:20): [True: 0, False: 0]
  Branch (520:20): [True: 0, False: 0]
  Branch (520:20): [Folded - Ignored]
  Branch (520:20): [True: 0, False: 5]
  Branch (520:20): [True: 0, False: 543]
  Branch (520:20): [True: 0, False: 0]
  Branch (520:20): [True: 0, False: 2]
521
0
                    return false;
522
550
                }
523
4
            }
524
        }
525
526
554
        true
527
554
    }
528
529
    /// Let the scheduler know that an operation has timed out from
530
    /// the client side (ie: worker has not updated in a while).
531
1
    async fn timeout_operation_id(&self, operation_id: &OperationId) -> Result<(), Error> {
532
        // Ensure that only one timeout operation is running at a time.
533
        // Failing to do this could result in the same operation being
534
        // timed out multiple times at the same time.
535
        // Note: We could implement this on a per-operation_id basis, but it is quite
536
        // complex to manage the locks.
537
1
        let _lock = self.timeout_operation_mux.lock().await;
538
539
1
        let awaited_action_subscriber = self
540
1
            .action_db
541
1
            .get_by_operation_id(operation_id)
542
1
            .await
543
1
            .err_tip(|| "In SimpleSchedulerStateManager::timeout_operation_id")
?0
544
1
            .err_tip(|| 
{0
545
0
                format!("Operation id {operation_id} does not exist in SimpleSchedulerStateManager::timeout_operation_id")
546
0
            })?;
547
548
1
        let awaited_action = awaited_action_subscriber
549
1
            .borrow()
550
1
            .await
551
1
            .err_tip(|| "In SimpleSchedulerStateManager::timeout_operation_id")
?0
;
552
553
        // If the action is not executing, we should not timeout the action.
554
1
        if !
matches!0
(awaited_action.state().stage, ActionStage::Executing) {
  Branch (554:12): [True: 0, False: 0]
  Branch (554:12): [True: 0, False: 0]
  Branch (554:12): [Folded - Ignored]
  Branch (554:12): [True: 0, False: 0]
  Branch (554:12): [True: 0, False: 1]
  Branch (554:12): [True: 0, False: 0]
  Branch (554:12): [True: 0, False: 0]
555
0
            return Ok(());
556
1
        }
557
558
1
        let now = (self.now_fn)().now();
559
560
        // Check worker liveness via registry if available.
561
1
        let registry_alive = if let Some(ref worker_registry) = self.worker_registry {
  Branch (561:37): [True: 0, False: 0]
  Branch (561:37): [True: 0, False: 0]
  Branch (561:37): [Folded - Ignored]
  Branch (561:37): [True: 0, False: 0]
  Branch (561:37): [True: 1, False: 0]
  Branch (561:37): [True: 0, False: 0]
  Branch (561:37): [True: 0, False: 0]
562
1
            if let Some(worker_id) = awaited_action.worker_id() {
  Branch (562:20): [True: 0, False: 0]
  Branch (562:20): [True: 0, False: 0]
  Branch (562:20): [Folded - Ignored]
  Branch (562:20): [True: 0, False: 0]
  Branch (562:20): [True: 1, False: 0]
  Branch (562:20): [True: 0, False: 0]
  Branch (562:20): [True: 0, False: 0]
563
1
                worker_registry
564
1
                    .is_worker_alive(worker_id, self.no_event_action_timeout, now)
565
1
                    .await
566
            } else {
567
0
                false
568
            }
569
        } else {
570
0
            false
571
        };
572
573
1
        let timestamp_alive = {
574
1
            let worker_should_update_before = awaited_action
575
1
                .last_worker_updated_timestamp()
576
1
                .checked_add(self.no_event_action_timeout)
577
1
                .unwrap_or(now);
578
1
            worker_should_update_before >= now
579
        };
580
581
1
        if registry_alive || timestamp_alive {
  Branch (581:12): [True: 0, False: 0]
  Branch (581:30): [True: 0, False: 0]
  Branch (581:12): [True: 0, False: 0]
  Branch (581:30): [True: 0, False: 0]
  Branch (581:12): [Folded - Ignored]
  Branch (581:30): [Folded - Ignored]
  Branch (581:12): [True: 0, False: 0]
  Branch (581:30): [True: 0, False: 0]
  Branch (581:12): [True: 0, False: 1]
  Branch (581:30): [True: 0, False: 1]
  Branch (581:12): [True: 0, False: 0]
  Branch (581:30): [True: 0, False: 0]
  Branch (581:12): [True: 0, False: 0]
  Branch (581:30): [True: 0, False: 0]
582
0
            trace!(
583
                %operation_id,
584
0
                worker_id = ?awaited_action.worker_id(),
585
                registry_alive,
586
                timestamp_alive,
587
0
                "Worker is alive, operation not timed out"
588
            );
589
0
            return Ok(());
590
1
        }
591
592
1
        debug!(
593
            %operation_id,
594
1
            worker_id = ?awaited_action.worker_id(),
595
            registry_alive,
596
            timestamp_alive,
597
1
            "Worker not alive via registry or timestamp, timing out operation"
598
        );
599
600
1
        self.assign_operation(
601
1
            operation_id,
602
1
            Err(make_err!(
603
1
                Code::DeadlineExceeded,
604
1
                "Operation timed out after {} seconds",
605
1
                self.no_event_action_timeout.as_secs_f32(),
606
1
            )),
607
1
        )
608
1
        .await
609
1
    }
610
611
50
    async fn inner_update_operation(
612
50
        &self,
613
50
        operation_id: &OperationId,
614
50
        maybe_worker_id: Option<&WorkerId>,
615
50
        update: UpdateOperationType,
616
50
    ) -> Result<(), Error> {
617
50
        let update_type_str = match &update {
618
0
            UpdateOperationType::KeepAlive => "KeepAlive",
619
37
            UpdateOperationType::UpdateWithActionStage(stage) => match stage {
620
0
                ActionStage::Queued => "Stage:Queued",
621
32
                ActionStage::Executing => "Stage:Executing",
622
5
                ActionStage::Completed(_) => "Stage:Completed",
623
0
                ActionStage::CompletedFromCache(_) => "Stage:CompletedFromCache",
624
0
                ActionStage::CacheCheck => "Stage:CacheCheck",
625
0
                ActionStage::Unknown => "Stage:Unknown",
626
            },
627
12
            UpdateOperationType::UpdateWithError(_) => "Error",
628
0
            UpdateOperationType::UpdateWithDisconnect => "Disconnect",
629
1
            UpdateOperationType::ExecutionComplete => "ExecutionComplete",
630
        };
631
632
50
        debug!(
633
            %operation_id,
634
            ?maybe_worker_id,
635
            update_type = %update_type_str,
636
50
            "inner_update_operation START"
637
        );
638
639
50
        let mut last_err = None;
640
50
        let mut retry_count = 0;
641
51
        for _ in 0..MAX_UPDATE_RETRIES {
642
51
            retry_count += 1;
643
51
            if retry_count > 1 {
  Branch (643:16): [True: 0, False: 0]
  Branch (643:16): [True: 0, False: 0]
  Branch (643:16): [Folded - Ignored]
  Branch (643:16): [True: 0, False: 8]
  Branch (643:16): [True: 0, False: 1]
  Branch (643:16): [True: 0, False: 39]
  Branch (643:16): [True: 0, False: 0]
  Branch (643:16): [True: 1, False: 2]
644
1
                let base_delay = BASE_RETRY_DELAY_MS * (1 << (retry_count - 2).min(4));
645
1
                let jitter = std::time::SystemTime::now()
646
1
                    .duration_since(std::time::UNIX_EPOCH)
647
1
                    .map(|d| u64::try_from(d.as_nanos()).expect("u64 error") % MAX_RETRY_JITTER_MS)
648
1
                    .unwrap_or(0);
649
1
                let delay = Duration::from_millis(base_delay + jitter);
650
651
1
                warn!(
652
                    %operation_id,
653
                    ?maybe_worker_id,
654
                    retry_count,
655
1
                    delay_ms = delay.as_millis(),
656
                    update_type = %update_type_str,
657
1
                    "Retrying operation update due to version conflict (with backoff)"
658
                );
659
660
1
                tokio::time::sleep(delay).await;
661
50
            }
662
51
            let maybe_awaited_action_subscriber = self
663
51
                .action_db
664
51
                .get_by_operation_id(operation_id)
665
51
                .await
666
51
                .err_tip(|| "In SimpleSchedulerStateManager::update_operation")
?0
;
667
51
            let Some(
awaited_action_subscriber49
) = maybe_awaited_action_subscriber else {
  Branch (667:17): [True: 0, False: 0]
  Branch (667:17): [True: 0, False: 0]
  Branch (667:17): [Folded - Ignored]
  Branch (667:17): [True: 8, False: 0]
  Branch (667:17): [True: 0, False: 1]
  Branch (667:17): [True: 39, False: 0]
  Branch (667:17): [True: 0, False: 0]
  Branch (667:17): [True: 2, False: 1]
668
                // No action found. It is ok if the action was not found. It
669
                // probably means that the action was dropped, but worker was
670
                // still processing it.
671
2
                warn!(
672
                    %operation_id,
673
2
                    "Unable to update action due to it being missing, probably dropped"
674
                );
675
2
                return Ok(());
676
            };
677
678
49
            let mut awaited_action = awaited_action_subscriber
679
49
                .borrow()
680
49
                .await
681
49
                .err_tip(|| "In SimpleSchedulerStateManager::update_operation")
?0
;
682
683
            // Make sure the worker id matches the awaited action worker id.
684
            // This might happen if the worker sending the update is not the
685
            // worker that was assigned.
686
49
            if awaited_action.worker_id().is_some()
  Branch (686:16): [True: 0, False: 0]
  Branch (686:16): [True: 0, False: 0]
  Branch (686:16): [Folded - Ignored]
  Branch (686:16): [True: 4, False: 4]
  Branch (686:16): [True: 0, False: 0]
  Branch (686:16): [True: 13, False: 26]
  Branch (686:16): [True: 0, False: 0]
  Branch (686:16): [True: 0, False: 2]
687
17
                && maybe_worker_id.is_some()
  Branch (687:20): [True: 0, False: 0]
  Branch (687:20): [True: 0, False: 0]
  Branch (687:20): [Folded - Ignored]
  Branch (687:20): [True: 4, False: 0]
  Branch (687:20): [True: 0, False: 0]
  Branch (687:20): [True: 12, False: 1]
  Branch (687:20): [True: 0, False: 0]
  Branch (687:20): [True: 0, False: 0]
688
16
                && maybe_worker_id != awaited_action.worker_id()
  Branch (688:20): [True: 0, False: 0]
  Branch (688:20): [True: 0, False: 0]
  Branch (688:20): [Folded - Ignored]
  Branch (688:20): [True: 0, False: 4]
  Branch (688:20): [True: 0, False: 0]
  Branch (688:20): [True: 0, False: 12]
  Branch (688:20): [True: 0, False: 0]
  Branch (688:20): [True: 0, False: 0]
689
            {
690
                // If another worker is already assigned to the action, another
691
                // worker probably picked up the action. We should not update the
692
                // action in this case and abort this operation.
693
0
                let err = make_err!(
694
0
                    Code::Aborted,
695
                    "Worker ids do not match - {:?} != {:?} for {:?}",
696
                    maybe_worker_id,
697
0
                    awaited_action.worker_id(),
698
                    awaited_action,
699
                );
700
0
                info!(
701
0
                    "Worker ids do not match - {:?} != {:?} for {:?}. This is probably due to another worker picking up the action.",
702
                    maybe_worker_id,
703
0
                    awaited_action.worker_id(),
704
                    awaited_action,
705
                );
706
0
                return Err(err);
707
49
            }
708
709
            // Make sure we don't update an action that is already completed.
710
49
            if awaited_action.state().stage.is_finished() {
  Branch (710:16): [True: 0, False: 0]
  Branch (710:16): [True: 0, False: 0]
  Branch (710:16): [Folded - Ignored]
  Branch (710:16): [True: 0, False: 8]
  Branch (710:16): [True: 0, False: 0]
  Branch (710:16): [True: 0, False: 39]
  Branch (710:16): [True: 0, False: 0]
  Branch (710:16): [True: 0, False: 2]
711
0
                match &update {
712
                    UpdateOperationType::UpdateWithDisconnect | UpdateOperationType::KeepAlive => {
713
                        // No need to error a keep-alive when it's completed, it's just
714
                        // unnecessary log noise.
715
0
                        return Ok(());
716
                    }
717
                    _ => {
718
0
                        return Err(make_err!(
719
0
                            Code::Internal,
720
0
                            "Action {operation_id} is already completed with state {:?} - maybe_worker_id: {:?}",
721
0
                            awaited_action.state().stage,
722
0
                            maybe_worker_id,
723
0
                        ));
724
                    }
725
                }
726
49
            }
727
728
49
            let stage = match &update {
729
                UpdateOperationType::KeepAlive => {
730
0
                    awaited_action.worker_keep_alive((self.now_fn)().now());
731
0
                    match self
732
0
                        .action_db
733
0
                        .update_awaited_action(awaited_action)
734
0
                        .await
735
0
                        .err_tip(|| "Failed to send KeepAlive in SimpleSchedulerStateManager::update_operation") {
736
                        // Try again if there was a version mismatch.
737
0
                        Err(err) if err.code == Code::Aborted => {
  Branch (737:37): [True: 0, False: 0]
  Branch (737:37): [True: 0, False: 0]
  Branch (737:37): [Folded - Ignored]
  Branch (737:37): [True: 0, False: 0]
  Branch (737:37): [True: 0, False: 0]
  Branch (737:37): [True: 0, False: 0]
  Branch (737:37): [True: 0, False: 0]
  Branch (737:37): [True: 0, False: 0]
738
0
                            last_err = Some(err);
739
0
                            continue;
740
                        }
741
0
                        result => return result,
742
                    }
743
                }
744
37
                UpdateOperationType::UpdateWithActionStage(stage) => stage.clone(),
745
12
                UpdateOperationType::UpdateWithError(err) => {
746
                    // Don't count a backpressure failure as an attempt for an action.
747
12
                    let due_to_backpressure = err.code == Code::ResourceExhausted;
748
12
                    if !due_to_backpressure {
  Branch (748:24): [True: 0, False: 0]
  Branch (748:24): [True: 0, False: 0]
  Branch (748:24): [Folded - Ignored]
  Branch (748:24): [True: 4, False: 0]
  Branch (748:24): [True: 0, False: 0]
  Branch (748:24): [True: 8, False: 0]
  Branch (748:24): [True: 0, False: 0]
  Branch (748:24): [True: 0, False: 0]
749
12
                        awaited_action.attempts += 1;
750
12
                    
}0
751
752
12
                    if awaited_action.attempts > self.max_job_retries {
  Branch (752:24): [True: 0, False: 0]
  Branch (752:24): [True: 0, False: 0]
  Branch (752:24): [Folded - Ignored]
  Branch (752:24): [True: 1, False: 3]
  Branch (752:24): [True: 0, False: 0]
  Branch (752:24): [True: 1, False: 7]
  Branch (752:24): [True: 0, False: 0]
  Branch (752:24): [True: 0, False: 0]
753
2
                        ActionStage::Completed(ActionResult {
754
2
                            execution_metadata: ExecutionMetadata {
755
2
                                worker: maybe_worker_id.map_or_else(String::default, ToString::to_string),
756
2
                                ..ExecutionMetadata::default()
757
2
                            },
758
2
                            error: Some(err.clone().merge(make_err!(
759
2
                                Code::Internal,
760
2
                                "Job cancelled because it attempted to execute too many times {} > {} times {}",
761
2
                                awaited_action.attempts,
762
2
                                self.max_job_retries,
763
2
                                format!("for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}"),
764
2
                            ))),
765
2
                            ..ActionResult::default()
766
2
                        })
767
                    } else {
768
10
                        ActionStage::Queued
769
                    }
770
                }
771
0
                UpdateOperationType::UpdateWithDisconnect => ActionStage::Queued,
772
                // We shouldn't get here, but we just ignore it if we do.
773
                UpdateOperationType::ExecutionComplete => {
774
0
                    warn!("inner_update_operation got an ExecutionComplete, that's unexpected.");
775
0
                    return Ok(());
776
                }
777
            };
778
49
            let now = (self.now_fn)().now();
779
49
            if 
matches!39
(stage, ActionStage::Queued) {
780
10
                // If the action is queued, we need to unset the worker id regardless of
781
10
                // which worker sent the update.
782
10
                awaited_action.set_worker_id(None, now);
783
39
            } else {
784
39
                awaited_action.set_worker_id(maybe_worker_id.cloned(), now);
785
39
            }
786
49
            awaited_action.worker_set_state(
787
49
                Arc::new(ActionState {
788
49
                    stage,
789
49
                    // Client id is not known here, it is the responsibility of
790
49
                    // the the subscriber impl to replace this with the
791
49
                    // correct client id.
792
49
                    client_operation_id: operation_id.clone(),
793
49
                    action_digest: awaited_action.action_info().digest(),
794
49
                    last_transition_timestamp: now,
795
49
                }),
796
49
                now,
797
            );
798
799
49
            let update_action_result = self
800
49
                .action_db
801
49
                .update_awaited_action(awaited_action.clone())
802
49
                .await
803
49
                .err_tip(|| "In SimpleSchedulerStateManager::update_operation");
804
49
            if let Err(
err2
) = update_action_result {
  Branch (804:20): [True: 0, False: 0]
  Branch (804:20): [True: 0, False: 0]
  Branch (804:20): [Folded - Ignored]
  Branch (804:20): [True: 0, False: 8]
  Branch (804:20): [True: 0, False: 0]
  Branch (804:20): [True: 0, False: 39]
  Branch (804:20): [True: 0, False: 0]
  Branch (804:20): [True: 2, False: 0]
805
                // We use Aborted to signal that the action was not
806
                // updated due to the data being set was not the latest
807
                // but can be retried.
808
2
                if err.code == Code::Aborted {
  Branch (808:20): [True: 0, False: 0]
  Branch (808:20): [True: 0, False: 0]
  Branch (808:20): [Folded - Ignored]
  Branch (808:20): [True: 0, False: 0]
  Branch (808:20): [True: 0, False: 0]
  Branch (808:20): [True: 0, False: 0]
  Branch (808:20): [True: 0, False: 0]
  Branch (808:20): [True: 1, False: 1]
809
1
                    debug!(
810
                        %operation_id,
811
                        retry_count,
812
                        update_type = %update_type_str,
813
1
                        "Version conflict (Aborted), will retry"
814
                    );
815
1
                    last_err = Some(err);
816
1
                    continue;
817
1
                }
818
1
                warn!(
819
                    %operation_id,
820
                    update_type = %update_type_str,
821
                    ?err,
822
1
                    "inner_update_operation FAILED (non-retryable)"
823
                );
824
1
                return Err(err);
825
47
            }
826
827
            // Record execution metrics after successful state update
828
47
            let action_state = awaited_action.state();
829
47
            let instance_name = awaited_action
830
47
                .action_info()
831
47
                .unique_qualifier
832
47
                .instance_name()
833
47
                .as_str();
834
47
            let worker_id = awaited_action
835
47
                .worker_id()
836
47
                .map(std::string::ToString::to_string);
837
47
            let priority = Some(awaited_action.action_info().priority);
838
839
            // Build base attributes for metrics
840
47
            let mut attrs = nativelink_util::metrics::make_execution_attributes(
841
47
                instance_name,
842
47
                worker_id.as_deref(),
843
47
                priority,
844
            );
845
846
            // Add stage attribute
847
47
            let execution_stage: ExecutionStage = (&action_state.stage).into();
848
47
            attrs.push(KeyValue::new(EXECUTION_STAGE, execution_stage));
849
850
            // Record stage transition
851
47
            EXECUTION_METRICS.execution_stage_transitions.add(1, &attrs);
852
853
            // For completed actions, record the completion count with result
854
47
            match &action_state.stage {
855
7
                ActionStage::Completed(action_result) => {
856
7
                    let result = if action_result.exit_code == 0 {
  Branch (856:37): [True: 0, False: 0]
  Branch (856:37): [True: 0, False: 0]
  Branch (856:37): [Folded - Ignored]
  Branch (856:37): [True: 0, False: 1]
  Branch (856:37): [True: 0, False: 0]
  Branch (856:37): [True: 5, False: 1]
  Branch (856:37): [True: 0, False: 0]
  Branch (856:37): [True: 0, False: 0]
857
5
                        ExecutionResult::Success
858
                    } else {
859
2
                        ExecutionResult::Failure
860
                    };
861
7
                    attrs.push(KeyValue::new(EXECUTION_RESULT, result));
862
7
                    EXECUTION_METRICS.execution_completed_count.add(1, &attrs);
863
                }
864
0
                ActionStage::CompletedFromCache(_) => {
865
0
                    attrs.push(KeyValue::new(EXECUTION_RESULT, ExecutionResult::CacheHit));
866
0
                    EXECUTION_METRICS.execution_completed_count.add(1, &attrs);
867
0
                }
868
40
                _ => {}
869
            }
870
871
47
            debug!(
872
                %operation_id,
873
                retry_count,
874
                update_type = %update_type_str,
875
47
                "inner_update_operation SUCCESS"
876
            );
877
47
            return Ok(());
878
        }
879
880
0
        warn!(
881
            %operation_id,
882
            update_type = %update_type_str,
883
            retry_count = MAX_UPDATE_RETRIES,
884
0
            "inner_update_operation EXHAUSTED all retries"
885
        );
886
0
        Err(last_err.unwrap_or_else(|| {
887
0
            make_err!(
888
0
                Code::Internal,
889
                "Failed to update action after {} retries with no error set",
890
                MAX_UPDATE_RETRIES,
891
            )
892
0
        }))
893
50
    }
894
895
30
    async fn inner_add_operation(
896
30
        &self,
897
30
        new_client_operation_id: OperationId,
898
30
        action_info: Arc<ActionInfo>,
899
30
    ) -> Result<T::Subscriber, Error> {
900
30
        self.action_db
901
30
            .add_action(
902
30
                new_client_operation_id,
903
30
                action_info,
904
30
                self.no_event_action_timeout,
905
30
            )
906
30
            .await
907
30
            .err_tip(|| "In SimpleSchedulerStateManager::add_operation")
908
30
    }
909
910
616
    async fn inner_filter_operations<'a, F>(
911
616
        &'a self,
912
616
        filter: OperationFilter,
913
616
        to_action_state_result: F,
914
616
    ) -> Result<ActionStateResultStream<'a>, Error>
915
616
    where
916
616
        F: Fn(T::Subscriber) -> Box<dyn ActionStateResult> + Send + Sync + 'a,
917
616
    {
918
612
        const fn sorted_awaited_action_state_for_flags(
919
612
            stage: OperationStageFlags,
920
612
        ) -> Option<SortedAwaitedActionState> {
921
612
            match stage {
922
0
                OperationStageFlags::CacheCheck => Some(SortedAwaitedActionState::CacheCheck),
923
611
                OperationStageFlags::Queued => Some(SortedAwaitedActionState::Queued),
924
0
                OperationStageFlags::Executing => Some(SortedAwaitedActionState::Executing),
925
0
                OperationStageFlags::Completed => Some(SortedAwaitedActionState::Completed),
926
1
                _ => None,
927
            }
928
612
        }
929
930
616
        if let Some(
operation_id0
) = &filter.operation_id {
  Branch (930:16): [True: 0, False: 0]
  Branch (930:16): [True: 0, False: 0]
  Branch (930:16): [True: 0, False: 0]
  Branch (930:16): [True: 0, False: 0]
  Branch (930:16): [Folded - Ignored]
  Branch (930:16): [True: 0, False: 1]
  Branch (930:16): [True: 0, False: 17]
  Branch (930:16): [True: 0, False: 503]
  Branch (930:16): [True: 0, False: 91]
  Branch (930:16): [True: 0, False: 0]
  Branch (930:16): [True: 0, False: 0]
  Branch (930:16): [True: 0, False: 0]
  Branch (930:16): [True: 0, False: 4]
931
0
            let maybe_subscriber = self
932
0
                .action_db
933
0
                .get_by_operation_id(operation_id)
934
0
                .await
935
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
936
0
            let Some(subscriber) = maybe_subscriber else {
  Branch (936:17): [True: 0, False: 0]
  Branch (936:17): [True: 0, False: 0]
  Branch (936:17): [True: 0, False: 0]
  Branch (936:17): [True: 0, False: 0]
  Branch (936:17): [Folded - Ignored]
  Branch (936:17): [True: 0, False: 0]
  Branch (936:17): [True: 0, False: 0]
  Branch (936:17): [True: 0, False: 0]
  Branch (936:17): [True: 0, False: 0]
  Branch (936:17): [True: 0, False: 0]
  Branch (936:17): [True: 0, False: 0]
  Branch (936:17): [True: 0, False: 0]
  Branch (936:17): [True: 0, False: 0]
937
0
                return Ok(Box::pin(stream::empty()));
938
            };
939
0
            let awaited_action = subscriber
940
0
                .borrow()
941
0
                .await
942
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
943
0
            if !self
  Branch (943:16): [True: 0, False: 0]
  Branch (943:16): [True: 0, False: 0]
  Branch (943:16): [True: 0, False: 0]
  Branch (943:16): [True: 0, False: 0]
  Branch (943:16): [Folded - Ignored]
  Branch (943:16): [True: 0, False: 0]
  Branch (943:16): [True: 0, False: 0]
  Branch (943:16): [True: 0, False: 0]
  Branch (943:16): [True: 0, False: 0]
  Branch (943:16): [True: 0, False: 0]
  Branch (943:16): [True: 0, False: 0]
  Branch (943:16): [True: 0, False: 0]
  Branch (943:16): [True: 0, False: 0]
944
0
                .apply_filter_predicate(&awaited_action, &subscriber, &filter)
945
0
                .await
946
            {
947
0
                return Ok(Box::pin(stream::empty()));
948
0
            }
949
0
            return Ok(Box::pin(stream::once(async move {
950
0
                to_action_state_result(subscriber)
951
0
            })));
952
616
        }
953
616
        if let Some(
client_operation_id4
) = &filter.client_operation_id {
  Branch (953:16): [True: 0, False: 0]
  Branch (953:16): [True: 0, False: 0]
  Branch (953:16): [True: 0, False: 0]
  Branch (953:16): [True: 0, False: 0]
  Branch (953:16): [Folded - Ignored]
  Branch (953:16): [True: 1, False: 0]
  Branch (953:16): [True: 0, False: 17]
  Branch (953:16): [True: 3, False: 500]
  Branch (953:16): [True: 0, False: 91]
  Branch (953:16): [True: 0, False: 0]
  Branch (953:16): [True: 0, False: 0]
  Branch (953:16): [True: 0, False: 0]
  Branch (953:16): [True: 0, False: 4]
954
4
            let maybe_subscriber = self
955
4
                .action_db
956
4
                .get_awaited_action_by_id(client_operation_id)
957
4
                .await
958
4
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
;
959
4
            let Some(subscriber) = maybe_subscriber else {
  Branch (959:17): [True: 0, False: 0]
  Branch (959:17): [True: 0, False: 0]
  Branch (959:17): [True: 0, False: 0]
  Branch (959:17): [True: 0, False: 0]
  Branch (959:17): [Folded - Ignored]
  Branch (959:17): [True: 1, False: 0]
  Branch (959:17): [True: 0, False: 0]
  Branch (959:17): [True: 3, False: 0]
  Branch (959:17): [True: 0, False: 0]
  Branch (959:17): [True: 0, False: 0]
  Branch (959:17): [True: 0, False: 0]
  Branch (959:17): [True: 0, False: 0]
  Branch (959:17): [True: 0, False: 0]
960
0
                return Ok(Box::pin(stream::empty()));
961
            };
962
4
            let awaited_action = subscriber
963
4
                .borrow()
964
4
                .await
965
4
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
;
966
4
            if !self
  Branch (966:16): [True: 0, False: 0]
  Branch (966:16): [True: 0, False: 0]
  Branch (966:16): [True: 0, False: 0]
  Branch (966:16): [True: 0, False: 0]
  Branch (966:16): [Folded - Ignored]
  Branch (966:16): [True: 0, False: 1]
  Branch (966:16): [True: 0, False: 0]
  Branch (966:16): [True: 0, False: 3]
  Branch (966:16): [True: 0, False: 0]
  Branch (966:16): [True: 0, False: 0]
  Branch (966:16): [True: 0, False: 0]
  Branch (966:16): [True: 0, False: 0]
  Branch (966:16): [True: 0, False: 0]
967
4
                .apply_filter_predicate(&awaited_action, &subscriber, &filter)
968
4
                .await
969
            {
970
0
                return Ok(Box::pin(stream::empty()));
971
4
            }
972
4
            return Ok(Box::pin(stream::once(async move {
973
4
                to_action_state_result(subscriber)
974
4
            })));
975
612
        }
976
977
611
        let Some(sorted_awaited_action_state) =
  Branch (977:13): [True: 0, False: 0]
  Branch (977:13): [True: 0, False: 0]
  Branch (977:13): [True: 0, False: 0]
  Branch (977:13): [True: 0, False: 0]
  Branch (977:13): [Folded - Ignored]
  Branch (977:13): [True: 0, False: 0]
  Branch (977:13): [True: 17, False: 0]
  Branch (977:13): [True: 500, False: 0]
  Branch (977:13): [True: 90, False: 1]
  Branch (977:13): [True: 0, False: 0]
  Branch (977:13): [True: 0, False: 0]
  Branch (977:13): [True: 0, False: 0]
  Branch (977:13): [True: 4, False: 0]
978
612
            sorted_awaited_action_state_for_flags(filter.stages)
979
        else {
980
1
            let mut all_items: Vec<_> = self
981
1
                .action_db
982
1
                .get_all_awaited_actions()
983
1
                .await
984
1
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
985
1
                .and_then(|awaited_action_subscriber| async move 
{0
986
0
                    let awaited_action = awaited_action_subscriber
987
0
                        .borrow()
988
0
                        .await
989
0
                        .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
990
0
                    Ok((awaited_action_subscriber, awaited_action))
991
0
                })
992
1
                .try_filter_map(|(subscriber, awaited_action)| 
{0
993
0
                    let filter = filter.clone();
994
0
                    async move {
995
0
                        Ok(self
996
0
                            .apply_filter_predicate(&awaited_action, &subscriber, &filter)
997
0
                            .await
998
0
                            .then_some((subscriber, awaited_action.sort_key())))
999
0
                    }
1000
0
                })
1001
1
                .try_collect()
1002
1
                .await
1003
1
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
;
1004
0
            match filter.order_by_priority_direction {
1005
0
                Some(OrderDirection::Asc) => all_items.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)),
1006
0
                Some(OrderDirection::Desc) => all_items.sort_unstable_by(|(_, a), (_, b)| b.cmp(a)),
1007
1
                None => {}
1008
            }
1009
1
            return Ok(Box::pin(stream::iter(
1010
1
                all_items
1011
1
                    .into_iter()
1012
1
                    .map(move |(subscriber, _)| 
to_action_state_result0
(
subscriber0
)),
1013
            )));
1014
        };
1015
1016
611
        let desc = 
matches!500
(
1017
111
            filter.order_by_priority_direction,
1018
            Some(OrderDirection::Desc)
1019
        );
1020
611
        let stream = self
1021
611
            .action_db
1022
611
            .get_range_of_actions(
1023
611
                sorted_awaited_action_state,
1024
611
                Bound::Unbounded,
1025
611
                Bound::Unbounded,
1026
611
                desc,
1027
611
            )
1028
611
            .await
1029
611
            .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
1030
611
            .and_then(|awaited_action_subscriber| async move 
{550
1031
550
                let awaited_action = awaited_action_subscriber
1032
550
                    .borrow()
1033
550
                    .await
1034
550
                    .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
;
1035
550
                Ok((awaited_action_subscriber, awaited_action))
1036
1.10k
            })
1037
611
            .try_filter_map(move |(subscriber, awaited_action)| 
{550
1038
550
                let filter = filter.clone();
1039
550
                async move {
1040
550
                    Ok(self
1041
550
                        .apply_filter_predicate(&awaited_action, &subscriber, &filter)
1042
550
                        .await
1043
550
                        .then_some(subscriber))
1044
550
                }
1045
550
            })
1046
611
            .map(move |result| -> Box<dyn ActionStateResult> 
{550
1047
550
                result.map_or_else(
1048
0
                    |e| -> Box<dyn ActionStateResult> { Box::new(ErrorActionStateResult(e)) },
1049
550
                    |v| -> Box<dyn ActionStateResult> { to_action_state_result(v) },
1050
                )
1051
550
            });
1052
611
        Ok(Box::pin(stream))
1053
616
    }
1054
}
1055
1056
#[async_trait]
1057
impl<T, I, NowFn> ClientStateManager for SimpleSchedulerStateManager<T, I, NowFn>
1058
where
1059
    T: AwaitedActionDb,
1060
    I: InstantWrapper,
1061
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
1062
{
1063
    async fn add_action(
1064
        &self,
1065
        client_operation_id: OperationId,
1066
        action_info: Arc<ActionInfo>,
1067
30
    ) -> Result<Box<dyn ActionStateResult>, Error> {
1068
        let sub = self
1069
            .inner_add_operation(client_operation_id, action_info.clone())
1070
            .await?;
1071
1072
        Ok(Box::new(ClientActionStateResult::new(
1073
            sub,
1074
            self.weak_self.clone(),
1075
            self.no_event_action_timeout,
1076
            self.now_fn.clone(),
1077
        )))
1078
30
    }
1079
1080
    async fn filter_operations<'a>(
1081
        &'a self,
1082
        filter: OperationFilter,
1083
504
    ) -> Result<ActionStateResultStream<'a>, Error> {
1084
504
        self.inner_filter_operations(filter, move |rx| {
1085
504
            Box::new(ClientActionStateResult::new(
1086
504
                rx,
1087
504
                self.weak_self.clone(),
1088
504
                self.no_event_action_timeout,
1089
504
                self.now_fn.clone(),
1090
504
            ))
1091
504
        })
1092
        .await
1093
504
    }
1094
1095
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
1096
0
        None
1097
0
    }
1098
}
1099
1100
#[async_trait]
1101
impl<T, I, NowFn> WorkerStateManager for SimpleSchedulerStateManager<T, I, NowFn>
1102
where
1103
    T: AwaitedActionDb,
1104
    I: InstantWrapper,
1105
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
1106
{
1107
    async fn update_operation(
1108
        &self,
1109
        operation_id: &OperationId,
1110
        worker_id: &WorkerId,
1111
        update: UpdateOperationType,
1112
17
    ) -> Result<(), Error> {
1113
        self.inner_update_operation(operation_id, Some(worker_id), update)
1114
            .await
1115
17
    }
1116
}
1117
1118
#[async_trait]
1119
impl<T, I, NowFn> MatchingEngineStateManager for SimpleSchedulerStateManager<T, I, NowFn>
1120
where
1121
    T: AwaitedActionDb,
1122
    I: InstantWrapper,
1123
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
1124
{
1125
    async fn filter_operations<'a>(
1126
        &'a self,
1127
        filter: OperationFilter,
1128
112
    ) -> Result<ActionStateResultStream<'a>, Error> {
1129
50
        self.inner_filter_operations(filter, |rx| {
1130
50
            Box::new(MatchingEngineActionStateResult::new(
1131
50
                rx,
1132
50
                self.weak_self.clone(),
1133
50
                self.no_event_action_timeout,
1134
50
                self.now_fn.clone(),
1135
50
            ))
1136
50
        })
1137
        .await
1138
112
    }
1139
1140
    async fn assign_operation(
1141
        &self,
1142
        operation_id: &OperationId,
1143
        worker_id_or_reason_for_unassign: Result<&WorkerId, Error>,
1144
33
    ) -> Result<(), Error> {
1145
        let (maybe_worker_id, update) = match worker_id_or_reason_for_unassign {
1146
            Ok(worker_id) => (
1147
                Some(worker_id),
1148
                UpdateOperationType::UpdateWithActionStage(ActionStage::Executing),
1149
            ),
1150
            Err(err) => (None, UpdateOperationType::UpdateWithError(err)),
1151
        };
1152
        self.inner_update_operation(operation_id, maybe_worker_id, update)
1153
            .await
1154
33
    }
1155
}