Coverage Report

Created: 2026-05-23 21:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/simple_scheduler_state_manager.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::ops::Bound;
16
use core::time::Duration;
17
use std::string::ToString;
18
use std::sync::{Arc, Weak};
19
20
use async_lock::Mutex;
21
use async_trait::async_trait;
22
use futures::{StreamExt, TryStreamExt, stream};
23
use nativelink_error::{Code, Error, ResultExt, make_err};
24
use nativelink_metric::MetricsComponent;
25
use nativelink_util::action_messages::{
26
    ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueQualifier, ExecutionMetadata,
27
    OperationId, WorkerId,
28
};
29
use nativelink_util::instant_wrapper::InstantWrapper;
30
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
31
use nativelink_util::metrics::{
32
    EXECUTION_METRICS, EXECUTION_RESULT, EXECUTION_STAGE, ExecutionResult, ExecutionStage,
33
};
34
use nativelink_util::operation_state_manager::{
35
    ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
36
    OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType, WorkerStateManager,
37
};
38
use nativelink_util::origin_event::OriginMetadata;
39
use opentelemetry::KeyValue;
40
use tracing::{debug, info, trace, warn};
41
42
use super::awaited_action_db::{
43
    AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, SortedAwaitedActionState,
44
};
45
use crate::worker_registry::SharedWorkerRegistry;
46
47
/// Maximum number of times an update to the database
48
/// can fail before giving up.
49
const MAX_UPDATE_RETRIES: usize = 5;
50
51
/// Base delay for exponential backoff on version conflicts (in ms).
52
const BASE_RETRY_DELAY_MS: u64 = 10;
53
54
/// Maximum jitter to add to retry delay (in ms).
55
const MAX_RETRY_JITTER_MS: u64 = 20;
56
57
/// Simple struct that implements the `ActionStateResult` trait and always returns an error.
58
struct ErrorActionStateResult(Error);
59
60
#[async_trait]
61
impl ActionStateResult for ErrorActionStateResult {
62
0
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
63
        Err(self.0.clone())
64
0
    }
65
66
0
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
67
        Err(self.0.clone())
68
0
    }
69
70
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
71
        Err(self.0.clone())
72
0
    }
73
}
74
75
struct ClientActionStateResult<U, T, I, NowFn>
76
where
77
    U: AwaitedActionSubscriber,
78
    T: AwaitedActionDb,
79
    I: InstantWrapper,
80
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
81
{
82
    inner: MatchingEngineActionStateResult<U, T, I, NowFn>,
83
}
84
85
impl<U, T, I, NowFn> ClientActionStateResult<U, T, I, NowFn>
86
where
87
    U: AwaitedActionSubscriber,
88
    T: AwaitedActionDb,
89
    I: InstantWrapper,
90
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
91
{
92
536
    const fn new(
93
536
        sub: U,
94
536
        simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
95
536
        no_event_action_timeout: Duration,
96
536
        now_fn: NowFn,
97
536
    ) -> Self {
98
536
        Self {
99
536
            inner: MatchingEngineActionStateResult::new(
100
536
                sub,
101
536
                simple_scheduler_state_manager,
102
536
                no_event_action_timeout,
103
536
                now_fn,
104
536
            ),
105
536
        }
106
536
    }
107
}
108
109
#[async_trait]
110
impl<U, T, I, NowFn> ActionStateResult for ClientActionStateResult<U, T, I, NowFn>
111
where
112
    U: AwaitedActionSubscriber,
113
    T: AwaitedActionDb,
114
    I: InstantWrapper,
115
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
116
{
117
6
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
118
        self.inner.as_state().await
119
6
    }
120
121
54
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
122
        self.inner.changed().await
123
54
    }
124
125
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
126
        self.inner.as_action_info().await
127
0
    }
128
}
129
130
struct MatchingEngineActionStateResult<U, T, I, NowFn>
131
where
132
    U: AwaitedActionSubscriber,
133
    T: AwaitedActionDb,
134
    I: InstantWrapper,
135
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
136
{
137
    awaited_action_sub: U,
138
    simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
139
    no_event_action_timeout: Duration,
140
    now_fn: NowFn,
141
}
142
impl<U, T, I, NowFn> MatchingEngineActionStateResult<U, T, I, NowFn>
143
where
144
    U: AwaitedActionSubscriber,
145
    T: AwaitedActionDb,
146
    I: InstantWrapper,
147
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
148
{
149
593
    const fn new(
150
593
        awaited_action_sub: U,
151
593
        simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
152
593
        no_event_action_timeout: Duration,
153
593
        now_fn: NowFn,
154
593
    ) -> Self {
155
593
        Self {
156
593
            awaited_action_sub,
157
593
            simple_scheduler_state_manager,
158
593
            no_event_action_timeout,
159
593
            now_fn,
160
593
        }
161
593
    }
162
}
163
164
#[async_trait]
165
impl<U, T, I, NowFn> ActionStateResult for MatchingEngineActionStateResult<U, T, I, NowFn>
166
where
167
    U: AwaitedActionSubscriber,
168
    T: AwaitedActionDb,
169
    I: InstantWrapper,
170
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
171
{
172
44
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
173
        let awaited_action = self
174
            .awaited_action_sub
175
            .borrow()
176
            .await
177
            .err_tip(|| "In MatchingEngineActionStateResult::as_state")?;
178
        Ok((
179
            awaited_action.state().clone(),
180
            awaited_action.maybe_origin_metadata().cloned(),
181
        ))
182
44
    }
183
184
54
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
185
        let mut timeout_attempts = 0;
186
        loop {
187
            tokio::select! {
188
                awaited_action_result = self.awaited_action_sub.changed() => {
189
                    return awaited_action_result
190
                        .err_tip(|| "In MatchingEngineActionStateResult::changed")
191
50
                        .map(|v| (v.state().clone(), v.maybe_origin_metadata().cloned()));
192
                }
193
                () = (self.now_fn)().sleep(self.no_event_action_timeout) => {
194
                    // Timeout happened, do additional checks below.
195
                }
196
            }
197
198
            let awaited_action = self
199
                .awaited_action_sub
200
                .borrow()
201
                .await
202
                .err_tip(|| "In MatchingEngineActionStateResult::changed")?;
203
204
            if matches!(awaited_action.state().stage, ActionStage::Queued) {
205
                // Actions in queued state do not get periodically updated,
206
                // so we don't need to timeout them.
207
                continue;
208
            }
209
210
            let simple_scheduler_state_manager = self
211
                .simple_scheduler_state_manager
212
                .upgrade()
213
0
                .err_tip(|| format!("Failed to upgrade weak reference to SimpleSchedulerStateManager in MatchingEngineActionStateResult::changed at attempt: {timeout_attempts}"))?;
214
215
            // Check if worker is alive via registry before timing out.
216
            let should_timeout = simple_scheduler_state_manager
217
                .should_timeout_operation(&awaited_action)
218
                .await;
219
220
            if !should_timeout {
221
                // Worker is alive, continue waiting for updates
222
                trace!(
223
                    operation_id = %awaited_action.operation_id(),
224
                    "Operation timeout check passed, worker is alive"
225
                );
226
                continue;
227
            }
228
229
            warn!(
230
                ?awaited_action,
231
                "OperationId {} / {} timed out after {} seconds issuing a retry",
232
                awaited_action.operation_id(),
233
                awaited_action.state().client_operation_id,
234
                self.no_event_action_timeout.as_secs_f32(),
235
            );
236
237
            simple_scheduler_state_manager
238
                .timeout_operation_id(awaited_action.operation_id())
239
                .await
240
                .err_tip(|| "In MatchingEngineActionStateResult::changed")?;
241
242
            if timeout_attempts >= MAX_UPDATE_RETRIES {
243
                return Err(make_err!(
244
                    Code::Internal,
245
                    "Failed to update action after {} retries with no error set in MatchingEngineActionStateResult::changed - {} {:?}",
246
                    MAX_UPDATE_RETRIES,
247
                    awaited_action.operation_id(),
248
                    awaited_action.state().stage,
249
                ));
250
            }
251
            timeout_attempts += 1;
252
        }
253
54
    }
254
255
57
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
256
        let awaited_action = self
257
            .awaited_action_sub
258
            .borrow()
259
            .await
260
            .err_tip(|| "In MatchingEngineActionStateResult::as_action_info")?;
261
        Ok((
262
            awaited_action.action_info().clone(),
263
            awaited_action.maybe_origin_metadata().cloned(),
264
        ))
265
57
    }
266
}
267
268
/// `SimpleSchedulerStateManager` is responsible for maintaining the state of the scheduler.
269
/// Scheduler state includes the actions that are queued, active, and recently completed.
270
/// It also includes the workers that are available to execute actions based on allocation
271
/// strategy.
272
#[derive(MetricsComponent, Debug)]
273
pub struct SimpleSchedulerStateManager<T, I, NowFn>
274
where
275
    T: AwaitedActionDb,
276
    I: InstantWrapper,
277
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
278
{
279
    /// Database for storing the state of all actions.
280
    #[metric(group = "action_db")]
281
    action_db: T,
282
283
    /// Maximum number of times a job can be retried.
284
    // TODO(palfrey) This should be a scheduler decorator instead
285
    // of always having it on every SimpleScheduler.
286
    #[metric(help = "Maximum number of times a job can be retried")]
287
    max_job_retries: usize,
288
289
    /// Duration after which an action is considered to be timed out if
290
    /// no event is received.
291
    #[metric(
292
        help = "Duration after which an action is considered to be timed out if no event is received"
293
    )]
294
    no_event_action_timeout: Duration,
295
296
    /// Mark operation as timed out if the worker has not updated in this duration.
297
    /// This is used to prevent operations from being stuck in the queue forever
298
    /// if it is not being processed by any worker.
299
    client_action_timeout: Duration,
300
301
    /// Maximum time an action can stay in Executing state without any worker
302
    /// update, regardless of worker keepalive status. `Duration::ZERO` disables.
303
    max_executing_timeout: Duration,
304
305
    // A lock to ensure only one timeout operation is running at a time
306
    // on this service.
307
    timeout_operation_mux: Mutex<()>,
308
309
    /// Weak reference to self.
310
    // We use a weak reference to reduce the risk of a memory leak from
311
    // future changes. If this becomes some kind of performance issue,
312
    // we can consider using a strong reference.
313
    weak_self: Weak<Self>,
314
315
    /// Function to get the current time.
316
    now_fn: NowFn,
317
318
    /// Worker registry for checking worker liveness.
319
    worker_registry: Option<SharedWorkerRegistry>,
320
}
321
322
impl<T, I, NowFn> SimpleSchedulerStateManager<T, I, NowFn>
323
where
324
    T: AwaitedActionDb,
325
    I: InstantWrapper,
326
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
327
{
328
28
    pub fn new(
329
28
        max_job_retries: usize,
330
28
        no_event_action_timeout: Duration,
331
28
        client_action_timeout: Duration,
332
28
        max_executing_timeout: Duration,
333
28
        action_db: T,
334
28
        now_fn: NowFn,
335
28
        worker_registry: Option<SharedWorkerRegistry>,
336
28
    ) -> Arc<Self> {
337
28
        Arc::new_cyclic(|weak_self| Self {
338
28
            action_db,
339
28
            max_job_retries,
340
28
            no_event_action_timeout,
341
28
            client_action_timeout,
342
28
            max_executing_timeout,
343
28
            timeout_operation_mux: Mutex::new(()),
344
28
            weak_self: weak_self.clone(),
345
28
            now_fn,
346
28
            worker_registry,
347
28
        })
348
28
    }
349
350
3
    pub async fn should_timeout_operation(&self, awaited_action: &AwaitedAction) -> bool {
351
3
        if !
matches!0
(awaited_action.state().stage, ActionStage::Executing) {
352
0
            return false;
353
3
        }
354
355
3
        let now = (self.now_fn)().now();
356
357
        // Honor the per-action `Action.timeout` from the RBE protocol as a
358
        // backend wall-clock deadline. Without this, the only enforcement is
359
        // the Bazel client's --test_timeout, which surfaces as TIMEOUT/NO
360
        // STATUS instead of a backend signal pointing at the worker.
361
3
        let action_timeout = awaited_action.action_info().timeout;
362
3
        if action_timeout > Duration::ZERO {
363
3
            let executing_started_at = awaited_action.state().last_transition_timestamp;
364
3
            if let Ok(elapsed) = now.duration_since(executing_started_at)
365
3
                && elapsed > action_timeout
366
            {
367
1
                return true;
368
2
            }
369
0
        }
370
371
2
        let registry_alive = if let Some(
ref worker_registry1
) = self.worker_registry {
372
1
            if let Some(worker_id) = awaited_action.worker_id() {
373
1
                worker_registry
374
1
                    .is_worker_alive(worker_id, self.no_event_action_timeout, now)
375
1
                    .await
376
            } else {
377
0
                false
378
            }
379
        } else {
380
1
            false
381
        };
382
383
2
        if registry_alive {
384
0
            if self.max_executing_timeout > Duration::ZERO {
385
0
                let last_update = awaited_action.last_worker_updated_timestamp();
386
0
                if let Ok(elapsed) = now.duration_since(last_update) {
387
0
                    return elapsed > self.max_executing_timeout;
388
0
                }
389
0
            }
390
0
            return false;
391
2
        }
392
393
2
        let worker_should_update_before = awaited_action
394
2
            .last_worker_updated_timestamp()
395
2
            .checked_add(self.no_event_action_timeout)
396
2
            .unwrap_or(now);
397
398
2
        worker_should_update_before < now
399
3
    }
400
401
561
    async fn apply_filter_predicate(
402
561
        &self,
403
561
        awaited_action: &AwaitedAction,
404
561
        subscriber: &T::Subscriber,
405
561
        filter: &OperationFilter,
406
561
    ) -> bool {
407
        // Note: The caller must filter `client_operation_id`.
408
409
561
        let mut maybe_reloaded_awaited_action: Option<AwaitedAction> = None;
410
561
        let now = (self.now_fn)().now();
411
412
        // Check if client has timed out
413
561
        if awaited_action.last_client_keepalive_timestamp() + self.client_action_timeout < now {
414
            // This may change if the version is out of date.
415
0
            let mut timed_out = true;
416
0
            if !awaited_action.state().stage.is_finished() {
417
0
                let mut state = awaited_action.state().as_ref().clone();
418
0
                warn!(operation_id = ?awaited_action.operation_id(), timeout_secs = self.client_action_timeout.as_secs_f32(), "Operation timed out having no more clients listening");
419
0
                state.stage = ActionStage::Completed(ActionResult {
420
0
                    error: Some(make_err!(
421
0
                        Code::DeadlineExceeded,
422
0
                        "Operation timed out {} seconds of having no more clients listening",
423
0
                        self.client_action_timeout.as_secs_f32(),
424
0
                    )),
425
0
                    ..ActionResult::default()
426
0
                });
427
0
                state.last_transition_timestamp = now;
428
0
                let state = Arc::new(state);
429
                // We may be competing with an client timestamp update, so try
430
                // this a few times.
431
0
                for attempt in 1..=MAX_UPDATE_RETRIES {
432
0
                    let mut new_awaited_action = match &maybe_reloaded_awaited_action {
433
0
                        None => awaited_action.clone(),
434
0
                        Some(reloaded_awaited_action) => reloaded_awaited_action.clone(),
435
                    };
436
0
                    new_awaited_action.worker_set_state(state.clone(), (self.now_fn)().now());
437
0
                    let err = match self
438
0
                        .action_db
439
0
                        .update_awaited_action(new_awaited_action)
440
0
                        .await
441
                    {
442
0
                        Ok(()) => break,
443
0
                        Err(err) => err,
444
                    };
445
                    // Reload from the database if the action was outdated.
446
0
                    let maybe_awaited_action =
447
0
                        if attempt == MAX_UPDATE_RETRIES || err.code != Code::Aborted {
448
0
                            None
449
                        } else {
450
0
                            subscriber.borrow().await.ok()
451
                        };
452
0
                    if let Some(reloaded_awaited_action) = maybe_awaited_action {
453
0
                        maybe_reloaded_awaited_action = Some(reloaded_awaited_action);
454
0
                    } else {
455
0
                        warn!(
456
                            "Failed to update action to timed out state after client keepalive timeout. This is ok if multiple schedulers tried to set the state at the same time: {err}",
457
                        );
458
0
                        break;
459
                    }
460
                    // Re-check the predicate after reload.
461
0
                    if maybe_reloaded_awaited_action
462
0
                        .as_ref()
463
0
                        .is_some_and(|awaited_action| {
464
0
                            awaited_action.last_client_keepalive_timestamp()
465
0
                                + self.client_action_timeout
466
0
                                >= (self.now_fn)().now()
467
0
                        })
468
                    {
469
0
                        timed_out = false;
470
0
                        break;
471
0
                    } else if maybe_reloaded_awaited_action
472
0
                        .as_ref()
473
0
                        .is_some_and(|awaited_action| awaited_action.state().stage.is_finished())
474
                    {
475
0
                        break;
476
0
                    }
477
                }
478
0
            }
479
0
            if timed_out {
480
0
                return false;
481
0
            }
482
561
        }
483
        // If the action was reloaded, then use that for the rest of the checks
484
        // instead of the input parameter.
485
561
        let awaited_action = maybe_reloaded_awaited_action
486
561
            .as_ref()
487
561
            .unwrap_or(awaited_action);
488
489
561
        if let Some(
operation_id0
) = &filter.operation_id
490
0
            && operation_id != awaited_action.operation_id()
491
        {
492
0
            return false;
493
561
        }
494
495
561
        if filter.worker_id.is_some() && 
filter.worker_id.as_ref()0
!= awaited_action.worker_id() {
496
0
            return false;
497
561
        }
498
499
        {
500
561
            if let Some(
filter_unique_key0
) = &filter.unique_key {
501
0
                match &awaited_action.action_info().unique_qualifier {
502
0
                    ActionUniqueQualifier::Cacheable(unique_key) => {
503
0
                        if filter_unique_key != unique_key {
504
0
                            return false;
505
0
                        }
506
                    }
507
                    ActionUniqueQualifier::Uncacheable(_) => {
508
0
                        return false;
509
                    }
510
                }
511
561
            }
512
561
            if let Some(
action_digest0
) = filter.action_digest
513
0
                && action_digest != awaited_action.action_info().digest()
514
            {
515
0
                return false;
516
561
            }
517
        }
518
519
        {
520
561
            let last_worker_update_timestamp = awaited_action.last_worker_updated_timestamp();
521
561
            if let Some(
worker_update_before0
) = filter.worker_update_before
522
0
                && worker_update_before < last_worker_update_timestamp
523
            {
524
0
                return false;
525
561
            }
526
561
            if let Some(
completed_before0
) = filter.completed_before
527
0
                && awaited_action.state().stage.is_finished()
528
0
                && completed_before < last_worker_update_timestamp
529
            {
530
0
                return false;
531
561
            }
532
561
            if filter.stages != OperationStageFlags::Any {
533
557
                let stage_flag = match awaited_action.state().stage {
534
0
                    ActionStage::Unknown => OperationStageFlags::Any,
535
0
                    ActionStage::CacheCheck => OperationStageFlags::CacheCheck,
536
557
                    ActionStage::Queued => OperationStageFlags::Queued,
537
0
                    ActionStage::Executing => OperationStageFlags::Executing,
538
                    ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => {
539
0
                        OperationStageFlags::Completed
540
                    }
541
                };
542
557
                if !filter.stages.intersects(stage_flag) {
543
0
                    return false;
544
557
                }
545
4
            }
546
        }
547
548
561
        true
549
561
    }
550
551
    /// Let the scheduler know that an operation has timed out from
552
    /// the client side (ie: worker has not updated in a while).
553
1
    async fn timeout_operation_id(&self, operation_id: &OperationId) -> Result<(), Error> {
554
        // Ensure that only one timeout operation is running at a time.
555
        // Failing to do this could result in the same operation being
556
        // timed out multiple times at the same time.
557
        // Note: We could implement this on a per-operation_id basis, but it is quite
558
        // complex to manage the locks.
559
1
        let _lock = self.timeout_operation_mux.lock().await;
560
561
1
        let awaited_action_subscriber = self
562
1
            .action_db
563
1
            .get_by_operation_id(operation_id)
564
1
            .await
565
1
            .err_tip(|| "In SimpleSchedulerStateManager::timeout_operation_id")
?0
566
1
            .err_tip(|| 
{0
567
0
                format!("Operation id {operation_id} does not exist in SimpleSchedulerStateManager::timeout_operation_id")
568
0
            })?;
569
570
1
        let awaited_action = awaited_action_subscriber
571
1
            .borrow()
572
1
            .await
573
1
            .err_tip(|| "In SimpleSchedulerStateManager::timeout_operation_id")
?0
;
574
575
        // If the action is not executing, we should not timeout the action.
576
1
        if !
matches!0
(awaited_action.state().stage, ActionStage::Executing) {
577
0
            return Ok(());
578
1
        }
579
580
1
        let now = (self.now_fn)().now();
581
582
        // Check worker liveness via registry if available.
583
1
        let registry_alive = if let Some(ref worker_registry) = self.worker_registry {
584
1
            if let Some(worker_id) = awaited_action.worker_id() {
585
1
                worker_registry
586
1
                    .is_worker_alive(worker_id, self.no_event_action_timeout, now)
587
1
                    .await
588
            } else {
589
0
                false
590
            }
591
        } else {
592
0
            false
593
        };
594
595
1
        let timestamp_alive = {
596
1
            let worker_should_update_before = awaited_action
597
1
                .last_worker_updated_timestamp()
598
1
                .checked_add(self.no_event_action_timeout)
599
1
                .unwrap_or(now);
600
1
            worker_should_update_before >= now
601
        };
602
603
1
        if registry_alive || timestamp_alive {
604
0
            trace!(
605
                %operation_id,
606
0
                worker_id = ?awaited_action.worker_id(),
607
                registry_alive,
608
                timestamp_alive,
609
                "Worker is alive, operation not timed out"
610
            );
611
0
            return Ok(());
612
1
        }
613
614
1
        warn!(
615
            %operation_id,
616
1
            worker_id = ?awaited_action.worker_id(),
617
            registry_alive,
618
            timestamp_alive,
619
            "Worker not alive via registry or timestamp, timing out operation"
620
        );
621
622
1
        self.assign_operation(
623
1
            operation_id,
624
1
            Err(make_err!(
625
1
                Code::DeadlineExceeded,
626
1
                "Operation timed out after {} seconds",
627
1
                self.no_event_action_timeout.as_secs_f32(),
628
1
            )),
629
1
        )
630
1
        .await
631
1
    }
632
633
58
    async fn inner_update_operation(
634
58
        &self,
635
58
        operation_id: &OperationId,
636
58
        maybe_worker_id: Option<&WorkerId>,
637
58
        update: UpdateOperationType,
638
58
    ) -> Result<(), Error> {
639
58
        let update_type_str = match &update {
640
0
            UpdateOperationType::KeepAlive => "KeepAlive",
641
43
            UpdateOperationType::UpdateWithActionStage(stage) => match stage {
642
0
                ActionStage::Queued => "Stage:Queued",
643
38
                ActionStage::Executing => "Stage:Executing",
644
5
                ActionStage::Completed(_) => "Stage:Completed",
645
0
                ActionStage::CompletedFromCache(_) => "Stage:CompletedFromCache",
646
0
                ActionStage::CacheCheck => "Stage:CacheCheck",
647
0
                ActionStage::Unknown => "Stage:Unknown",
648
            },
649
12
            UpdateOperationType::UpdateWithError(_) => "Error",
650
2
            UpdateOperationType::UpdateWithDisconnect => "Disconnect",
651
1
            UpdateOperationType::ExecutionComplete => "ExecutionComplete",
652
        };
653
654
58
        debug!(
655
            %operation_id,
656
            ?maybe_worker_id,
657
            update_type = %update_type_str,
658
            "inner_update_operation START"
659
        );
660
661
58
        let mut last_err = None;
662
58
        let mut retry_count = 0;
663
58
        for _ in 0..MAX_UPDATE_RETRIES {
664
62
            retry_count += 1;
665
62
            if retry_count > 1 {
666
4
                let base_delay = BASE_RETRY_DELAY_MS * (1 << (retry_count - 2).min(4));
667
4
                let jitter = std::time::SystemTime::now()
668
4
                    .duration_since(std::time::UNIX_EPOCH)
669
4
                    .map_or(0, |d| {
670
4
                        u64::try_from(d.as_nanos()).expect("u64 error") % MAX_RETRY_JITTER_MS
671
4
                    });
672
4
                let delay = Duration::from_millis(base_delay + jitter);
673
674
4
                warn!(
675
                    %operation_id,
676
                    ?maybe_worker_id,
677
                    retry_count,
678
4
                    delay_ms = delay.as_millis(),
679
                    update_type = %update_type_str,
680
                    "Retrying operation update due to version conflict (with backoff)"
681
                );
682
683
4
                tokio::time::sleep(delay).await;
684
58
            }
685
62
            let maybe_awaited_action_subscriber = self
686
62
                .action_db
687
62
                .get_by_operation_id(operation_id)
688
62
                .await
689
62
                .err_tip(|| "In SimpleSchedulerStateManager::update_operation")
?0
;
690
62
            let Some(
awaited_action_subscriber60
) = maybe_awaited_action_subscriber else {
691
                // No action found. It is ok if the action was not found. It
692
                // probably means that the action was dropped, but worker was
693
                // still processing it.
694
2
                warn!(
695
                    %operation_id,
696
                    "Unable to update action due to it being missing, probably dropped"
697
                );
698
2
                return Ok(());
699
            };
700
701
60
            let mut awaited_action = awaited_action_subscriber
702
60
                .borrow()
703
60
                .await
704
60
                .err_tip(|| "In SimpleSchedulerStateManager::update_operation")
?0
;
705
706
            // Make sure the worker id matches the awaited action worker id.
707
            // This might happen if the worker sending the update is not the
708
            // worker that was assigned.
709
60
            if awaited_action.worker_id().is_some()
710
23
                && maybe_worker_id.is_some()
711
22
                && maybe_worker_id != awaited_action.worker_id()
712
            {
713
                // If another worker is already assigned to the action, another
714
                // worker probably picked up the action. We should not update the
715
                // action in this case and abort this operation.
716
0
                let err = make_err!(
717
0
                    Code::Aborted,
718
                    "Worker ids do not match - {:?} != {:?} for {:?}",
719
                    maybe_worker_id,
720
0
                    awaited_action.worker_id(),
721
                    awaited_action,
722
                );
723
0
                info!(
724
                    "Worker ids do not match - {:?} != {:?} for {:?}. This is probably due to another worker picking up the action.",
725
                    maybe_worker_id,
726
0
                    awaited_action.worker_id(),
727
                    awaited_action,
728
                );
729
0
                return Err(err);
730
60
            }
731
732
            // Make sure we don't update an action that is already completed.
733
60
            if awaited_action.state().stage.is_finished() {
734
0
                match &update {
735
                    UpdateOperationType::UpdateWithDisconnect | UpdateOperationType::KeepAlive => {
736
                        // No need to error a keep-alive when it's completed, it's just
737
                        // unnecessary log noise.
738
0
                        return Ok(());
739
                    }
740
                    _ => {
741
0
                        return Err(make_err!(
742
0
                            Code::Internal,
743
0
                            "Action {operation_id} is already completed with state {:?} - maybe_worker_id: {:?}",
744
0
                            awaited_action.state().stage,
745
0
                            maybe_worker_id,
746
0
                        ));
747
                    }
748
                }
749
60
            }
750
751
60
            let 
stage56
= match &update {
752
                UpdateOperationType::KeepAlive => {
753
0
                    awaited_action.worker_keep_alive((self.now_fn)().now());
754
0
                    match self
755
0
                        .action_db
756
0
                        .update_awaited_action(awaited_action)
757
0
                        .await
758
0
                        .err_tip(|| "Failed to send KeepAlive in SimpleSchedulerStateManager::update_operation") {
759
                        // Try again if there was a version mismatch.
760
0
                        Err(err) if err.code == Code::Aborted => {
761
0
                            last_err = Some(err);
762
0
                            continue;
763
                        }
764
0
                        result => return result,
765
                    }
766
                }
767
46
                UpdateOperationType::UpdateWithActionStage(stage) => {
768
46
                    if stage == &ActionStage::Executing
769
41
                        && awaited_action.state().stage == ActionStage::Executing
770
                    {
771
4
                        warn!(state = ?awaited_action.state(), "Action already assigned");
772
4
                        return Err(make_err!(Code::Aborted, "Action already assigned"));
773
42
                    }
774
42
                    stage.clone()
775
                }
776
12
                UpdateOperationType::UpdateWithError(err) => {
777
                    // Don't count a backpressure failure as an attempt for an action.
778
12
                    let due_to_backpressure = err.code == Code::ResourceExhausted;
779
12
                    if !due_to_backpressure {
780
12
                        awaited_action.attempts += 1;
781
12
                    
}0
782
783
12
                    if awaited_action.attempts > self.max_job_retries {
784
2
                        ActionStage::Completed(ActionResult {
785
2
                            execution_metadata: ExecutionMetadata {
786
2
                                worker: maybe_worker_id.map_or_else(String::default, ToString::to_string),
787
2
                                ..ExecutionMetadata::default()
788
2
                            },
789
2
                            error: Some(err.clone().merge(make_err!(
790
2
                                Code::Internal,
791
2
                                "Job cancelled because it attempted to execute too many times {} > {} times {}",
792
2
                                awaited_action.attempts,
793
2
                                self.max_job_retries,
794
2
                                format!("for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}"),
795
2
                            ))),
796
2
                            ..ActionResult::default()
797
2
                        })
798
                    } else {
799
10
                        ActionStage::Queued
800
                    }
801
                }
802
                UpdateOperationType::UpdateWithDisconnect => {
803
                    // A worker disconnect (e.g. OOMKill, pod eviction, network
804
                    // drop) used to requeue without counting as an attempt,
805
                    // which let an action that always crashes its worker loop
806
                    // forever until the Bazel client's --test_timeout fired.
807
                    // Count disconnects as attempts so max_job_retries caps the
808
                    // loop and the client sees a backend-attributable error.
809
2
                    awaited_action.attempts += 1;
810
811
2
                    if awaited_action.attempts > self.max_job_retries {
812
1
                        ActionStage::Completed(ActionResult {
813
1
                            execution_metadata: ExecutionMetadata {
814
1
                                worker: maybe_worker_id
815
1
                                    .map_or_else(String::default, ToString::to_string),
816
1
                                ..ExecutionMetadata::default()
817
1
                            },
818
1
                            error: Some(make_err!(
819
1
                                Code::Internal,
820
1
                                "Worker disconnected repeatedly while executing this action ({} > {} attempts); the runner likely OOMKilled or the pod was evicted. {}",
821
1
                                awaited_action.attempts,
822
1
                                self.max_job_retries,
823
1
                                format!(
824
1
                                    "for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}"
825
1
                                ),
826
1
                            )),
827
1
                            ..ActionResult::default()
828
1
                        })
829
                    } else {
830
1
                        ActionStage::Queued
831
                    }
832
                }
833
                // We shouldn't get here, but we just ignore it if we do.
834
                UpdateOperationType::ExecutionComplete => {
835
0
                    warn!("inner_update_operation got an ExecutionComplete, that's unexpected.");
836
0
                    return Ok(());
837
                }
838
            };
839
56
            let now = (self.now_fn)().now();
840
56
            if 
matches!45
(stage, ActionStage::Queued) {
841
11
                // If the action is queued, we need to unset the worker id regardless of
842
11
                // which worker sent the update.
843
11
                awaited_action.set_worker_id(None, now);
844
45
            } else {
845
45
                awaited_action.set_worker_id(maybe_worker_id.cloned(), now);
846
45
            }
847
56
            awaited_action.worker_set_state(
848
56
                Arc::new(ActionState {
849
56
                    stage,
850
56
                    // Client id is not known here, it is the responsibility of
851
56
                    // the the subscriber impl to replace this with the
852
56
                    // correct client id.
853
56
                    client_operation_id: operation_id.clone(),
854
56
                    action_digest: awaited_action.action_info().digest(),
855
56
                    last_transition_timestamp: now,
856
56
                }),
857
56
                now,
858
            );
859
860
56
            let update_action_result = self
861
56
                .action_db
862
56
                .update_awaited_action(awaited_action.clone())
863
56
                .await
864
56
                .err_tip(|| "In SimpleSchedulerStateManager::update_operation");
865
56
            if let Err(
err5
) = update_action_result {
866
                // We use Aborted to signal that the action was not
867
                // updated due to the data being set was not the latest
868
                // but can be retried.
869
5
                if err.code == Code::Aborted {
870
4
                    debug!(
871
                        %operation_id,
872
                        retry_count,
873
                        update_type = %update_type_str,
874
                        "Version conflict (Aborted), will retry"
875
                    );
876
4
                    last_err = Some(err);
877
4
                    continue;
878
1
                }
879
1
                warn!(
880
                    %operation_id,
881
                    update_type = %update_type_str,
882
                    ?err,
883
                    "inner_update_operation FAILED (non-retryable)"
884
                );
885
1
                return Err(err);
886
51
            }
887
888
            // Record execution metrics after successful state update
889
51
            let action_state = awaited_action.state();
890
51
            let instance_name = awaited_action
891
51
                .action_info()
892
51
                .unique_qualifier
893
51
                .instance_name()
894
51
                .as_str();
895
51
            let worker_id = awaited_action
896
51
                .worker_id()
897
51
                .map(std::string::ToString::to_string);
898
51
            let priority = Some(awaited_action.action_info().priority);
899
900
            // Build base attributes for metrics
901
51
            let mut attrs = nativelink_util::metrics::make_execution_attributes(
902
51
                instance_name,
903
51
                worker_id.as_deref(),
904
51
                priority,
905
            );
906
907
            // Add stage attribute
908
51
            let execution_stage: ExecutionStage = (&action_state.stage).into();
909
51
            attrs.push(KeyValue::new(EXECUTION_STAGE, execution_stage));
910
911
            // Record stage transition
912
51
            EXECUTION_METRICS.execution_stage_transitions.add(1, &attrs);
913
914
            // For completed actions, record the completion count with result
915
51
            match &action_state.stage {
916
8
                ActionStage::Completed(action_result) => {
917
8
                    let result = if action_result.exit_code == 0 {
918
5
                        ExecutionResult::Success
919
                    } else {
920
3
                        ExecutionResult::Failure
921
                    };
922
8
                    attrs.push(KeyValue::new(EXECUTION_RESULT, result));
923
8
                    EXECUTION_METRICS.execution_completed_count.add(1, &attrs);
924
                }
925
0
                ActionStage::CompletedFromCache(_) => {
926
0
                    attrs.push(KeyValue::new(EXECUTION_RESULT, ExecutionResult::CacheHit));
927
0
                    EXECUTION_METRICS.execution_completed_count.add(1, &attrs);
928
0
                }
929
43
                _ => {}
930
            }
931
932
51
            debug!(
933
                %operation_id,
934
                retry_count,
935
                update_type = %update_type_str,
936
                "inner_update_operation SUCCESS"
937
            );
938
51
            return Ok(());
939
        }
940
941
0
        warn!(
942
            %operation_id,
943
            update_type = %update_type_str,
944
            retry_count = MAX_UPDATE_RETRIES,
945
            "inner_update_operation EXHAUSTED all retries"
946
        );
947
0
        Err(last_err.unwrap_or_else(|| {
948
0
            make_err!(
949
0
                Code::Internal,
950
                "Failed to update action after {} retries with no error set",
951
                MAX_UPDATE_RETRIES,
952
            )
953
0
        }))
954
58
    }
955
956
32
    async fn inner_add_operation(
957
32
        &self,
958
32
        new_client_operation_id: OperationId,
959
32
        action_info: Arc<ActionInfo>,
960
32
    ) -> Result<T::Subscriber, Error> {
961
32
        self.action_db
962
32
            .add_action(
963
32
                new_client_operation_id,
964
32
                action_info,
965
32
                self.no_event_action_timeout,
966
32
            )
967
32
            .await
968
32
            .err_tip(|| "In SimpleSchedulerStateManager::add_operation")
969
32
    }
970
971
626
    async fn inner_filter_operations<'a, F>(
972
626
        &'a self,
973
626
        filter: OperationFilter,
974
626
        to_action_state_result: F,
975
626
    ) -> Result<ActionStateResultStream<'a>, Error>
976
626
    where
977
626
        F: Fn(T::Subscriber) -> Box<dyn ActionStateResult> + Send + Sync + 'a,
978
626
    {
979
622
        const fn sorted_awaited_action_state_for_flags(
980
622
            stage: OperationStageFlags,
981
622
        ) -> Option<SortedAwaitedActionState> {
982
622
            match stage {
983
0
                OperationStageFlags::CacheCheck => Some(SortedAwaitedActionState::CacheCheck),
984
620
                OperationStageFlags::Queued => Some(SortedAwaitedActionState::Queued),
985
0
                OperationStageFlags::Executing => Some(SortedAwaitedActionState::Executing),
986
0
                OperationStageFlags::Completed => Some(SortedAwaitedActionState::Completed),
987
2
                _ => None,
988
            }
989
622
        }
990
991
626
        if let Some(
operation_id0
) = &filter.operation_id {
992
0
            let maybe_subscriber = self
993
0
                .action_db
994
0
                .get_by_operation_id(operation_id)
995
0
                .await
996
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
997
0
            let Some(subscriber) = maybe_subscriber else {
998
0
                return Ok(Box::pin(stream::empty()));
999
            };
1000
0
            let awaited_action = subscriber
1001
0
                .borrow()
1002
0
                .await
1003
0
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
1004
0
            if !self
1005
0
                .apply_filter_predicate(&awaited_action, &subscriber, &filter)
1006
0
                .await
1007
            {
1008
0
                return Ok(Box::pin(stream::empty()));
1009
0
            }
1010
0
            return Ok(Box::pin(stream::once(async move {
1011
0
                to_action_state_result(subscriber)
1012
0
            })));
1013
626
        }
1014
626
        if let Some(
client_operation_id4
) = &filter.client_operation_id {
1015
4
            let maybe_subscriber = self
1016
4
                .action_db
1017
4
                .get_awaited_action_by_id(client_operation_id)
1018
4
                .await
1019
4
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
;
1020
4
            let Some(subscriber) = maybe_subscriber else {
1021
0
                return Ok(Box::pin(stream::empty()));
1022
            };
1023
4
            let awaited_action = subscriber
1024
4
                .borrow()
1025
4
                .await
1026
4
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
;
1027
4
            if !self
1028
4
                .apply_filter_predicate(&awaited_action, &subscriber, &filter)
1029
4
                .await
1030
            {
1031
0
                return Ok(Box::pin(stream::empty()));
1032
4
            }
1033
4
            return Ok(Box::pin(stream::once(async move {
1034
4
                to_action_state_result(subscriber)
1035
4
            })));
1036
622
        }
1037
1038
620
        let Some(sorted_awaited_action_state) =
1039
622
            sorted_awaited_action_state_for_flags(filter.stages)
1040
        else {
1041
2
            let mut all_items: Vec<_> = self
1042
2
                .action_db
1043
2
                .get_all_awaited_actions()
1044
2
                .await
1045
2
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
1046
2
                .and_then(|awaited_action_subscriber| async move 
{0
1047
0
                    let awaited_action = awaited_action_subscriber
1048
0
                        .borrow()
1049
0
                        .await
1050
0
                        .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")?;
1051
0
                    Ok((awaited_action_subscriber, awaited_action))
1052
0
                })
1053
2
                .try_filter_map(|(subscriber, awaited_action)| 
{0
1054
0
                    let filter = filter.clone();
1055
0
                    async move {
1056
0
                        Ok(self
1057
0
                            .apply_filter_predicate(&awaited_action, &subscriber, &filter)
1058
0
                            .await
1059
0
                            .then_some((subscriber, awaited_action.sort_key())))
1060
0
                    }
1061
0
                })
1062
2
                .try_collect()
1063
2
                .await
1064
2
                .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
;
1065
1066
            #[allow(clippy::unnecessary_sort_by)]
1067
0
            match filter.order_by_priority_direction {
1068
0
                Some(OrderDirection::Asc) => all_items.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)),
1069
0
                Some(OrderDirection::Desc) => all_items.sort_unstable_by(|(_, a), (_, b)| b.cmp(a)),
1070
2
                None => {}
1071
            }
1072
2
            return Ok(Box::pin(stream::iter(
1073
2
                all_items
1074
2
                    .into_iter()
1075
2
                    .map(move |(subscriber, _)| 
to_action_state_result0
(
subscriber0
)),
1076
            )));
1077
        };
1078
1079
620
        let desc = 
matches!500
(
1080
120
            filter.order_by_priority_direction,
1081
            Some(OrderDirection::Desc)
1082
        );
1083
620
        let stream = self
1084
620
            .action_db
1085
620
            .get_range_of_actions(
1086
620
                sorted_awaited_action_state,
1087
620
                Bound::Unbounded,
1088
620
                Bound::Unbounded,
1089
620
                desc,
1090
620
            )
1091
620
            .await
1092
620
            .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
1093
620
            .and_then(|awaited_action_subscriber| async move 
{557
1094
557
                let awaited_action = awaited_action_subscriber
1095
557
                    .borrow()
1096
557
                    .await
1097
557
                    .err_tip(|| "In SimpleSchedulerStateManager::filter_operations")
?0
;
1098
557
                Ok((awaited_action_subscriber, awaited_action))
1099
1.11k
            })
1100
620
            .try_filter_map(move |(subscriber, awaited_action)| 
{557
1101
557
                let filter = filter.clone();
1102
557
                async move {
1103
557
                    Ok(self
1104
557
                        .apply_filter_predicate(&awaited_action, &subscriber, &filter)
1105
557
                        .await
1106
557
                        .then_some(subscriber))
1107
557
                }
1108
557
            })
1109
620
            .map(move |result| -> Box<dyn ActionStateResult> 
{557
1110
557
                result.map_or_else(
1111
0
                    |e| -> Box<dyn ActionStateResult> { Box::new(ErrorActionStateResult(e)) },
1112
557
                    |v| -> Box<dyn ActionStateResult> { to_action_state_result(v) },
1113
                )
1114
557
            });
1115
620
        Ok(Box::pin(stream))
1116
626
    }
1117
}
1118
1119
#[async_trait]
1120
impl<T, I, NowFn> ClientStateManager for SimpleSchedulerStateManager<T, I, NowFn>
1121
where
1122
    T: AwaitedActionDb,
1123
    I: InstantWrapper,
1124
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
1125
{
1126
    async fn add_action(
1127
        &self,
1128
        client_operation_id: OperationId,
1129
        action_info: Arc<ActionInfo>,
1130
32
    ) -> Result<Box<dyn ActionStateResult>, Error> {
1131
        let sub = self
1132
            .inner_add_operation(client_operation_id, action_info.clone())
1133
            .await?;
1134
1135
        Ok(Box::new(ClientActionStateResult::new(
1136
            sub,
1137
            self.weak_self.clone(),
1138
            self.no_event_action_timeout,
1139
            self.now_fn.clone(),
1140
        )))
1141
32
    }
1142
1143
    async fn filter_operations<'a>(
1144
        &'a self,
1145
        filter: OperationFilter,
1146
504
    ) -> Result<ActionStateResultStream<'a>, Error> {
1147
504
        self.inner_filter_operations(filter, move |rx| {
1148
504
            Box::new(ClientActionStateResult::new(
1149
504
                rx,
1150
504
                self.weak_self.clone(),
1151
504
                self.no_event_action_timeout,
1152
504
                self.now_fn.clone(),
1153
504
            ))
1154
504
        })
1155
        .await
1156
504
    }
1157
1158
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
1159
0
        None
1160
0
    }
1161
}
1162
1163
#[async_trait]
1164
impl<T, I, NowFn> WorkerStateManager for SimpleSchedulerStateManager<T, I, NowFn>
1165
where
1166
    T: AwaitedActionDb,
1167
    I: InstantWrapper,
1168
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
1169
{
1170
    async fn update_operation(
1171
        &self,
1172
        operation_id: &OperationId,
1173
        worker_id: &WorkerId,
1174
        update: UpdateOperationType,
1175
19
    ) -> Result<(), Error> {
1176
        self.inner_update_operation(operation_id, Some(worker_id), update)
1177
            .await
1178
19
    }
1179
}
1180
1181
#[async_trait]
1182
impl<T, I, NowFn> MatchingEngineStateManager for SimpleSchedulerStateManager<T, I, NowFn>
1183
where
1184
    T: AwaitedActionDb,
1185
    I: InstantWrapper,
1186
    NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
1187
{
1188
    async fn filter_operations<'a>(
1189
        &'a self,
1190
        filter: OperationFilter,
1191
122
    ) -> Result<ActionStateResultStream<'a>, Error> {
1192
57
        self.inner_filter_operations(filter, |rx| {
1193
57
            Box::new(MatchingEngineActionStateResult::new(
1194
57
                rx,
1195
57
                self.weak_self.clone(),
1196
57
                self.no_event_action_timeout,
1197
57
                self.now_fn.clone(),
1198
57
            ))
1199
57
        })
1200
        .await
1201
122
    }
1202
1203
    async fn assign_operation(
1204
        &self,
1205
        operation_id: &OperationId,
1206
        worker_id_or_reason_for_unassign: Result<&WorkerId, Error>,
1207
39
    ) -> Result<(), Error> {
1208
        let (maybe_worker_id, update) = match worker_id_or_reason_for_unassign {
1209
            Ok(worker_id) => (
1210
                Some(worker_id),
1211
                UpdateOperationType::UpdateWithActionStage(ActionStage::Executing),
1212
            ),
1213
            Err(err) => (None, UpdateOperationType::UpdateWithError(err)),
1214
        };
1215
        self.inner_update_operation(operation_id, maybe_worker_id, update)
1216
            .await
1217
39
    }
1218
}