Coverage Report

Created: 2025-07-30 16:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/store_awaited_action_db.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::ops::Bound;
16
use core::sync::atomic::{AtomicU64, Ordering};
17
use core::time::Duration;
18
use std::borrow::Cow;
19
use std::sync::{Arc, Weak};
20
21
use bytes::Bytes;
22
use futures::{Stream, TryStreamExt};
23
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
24
use nativelink_metric::MetricsComponent;
25
use nativelink_util::action_messages::{
26
    ActionInfo, ActionStage, ActionUniqueQualifier, OperationId,
27
};
28
use nativelink_util::instant_wrapper::InstantWrapper;
29
use nativelink_util::spawn;
30
use nativelink_util::store_trait::{
31
    FalseValue, SchedulerCurrentVersionProvider, SchedulerIndexProvider, SchedulerStore,
32
    SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,
33
    SchedulerSubscription, SchedulerSubscriptionManager, StoreKey, TrueValue,
34
};
35
use nativelink_util::task::JoinHandleDropGuard;
36
use tokio::sync::Notify;
37
use tracing::{error, warn};
38
39
use crate::awaited_action_db::{
40
    AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, CLIENT_KEEPALIVE_DURATION,
41
    SortedAwaitedAction, SortedAwaitedActionState,
42
};
43
44
type ClientOperationId = OperationId;
45
46
/// Maximum number of retries to update client keep alive.
47
const MAX_RETRIES_FOR_CLIENT_KEEPALIVE: u32 = 8;
48
49
enum OperationSubscriberState<Sub> {
50
    Unsubscribed,
51
    Subscribed(Sub),
52
}
53
54
pub struct OperationSubscriber<S: SchedulerStore, I: InstantWrapper, NowFn: Fn() -> I> {
55
    maybe_client_operation_id: Option<ClientOperationId>,
56
    subscription_key: OperationIdToAwaitedAction<'static>,
57
    weak_store: Weak<S>,
58
    state: OperationSubscriberState<
59
        <S::SubscriptionManager as SchedulerSubscriptionManager>::Subscription,
60
    >,
61
    last_known_keepalive_ts: AtomicU64,
62
    now_fn: NowFn,
63
}
64
65
impl<S: SchedulerStore, I: InstantWrapper, NowFn: Fn() -> I + core::fmt::Debug> core::fmt::Debug
66
    for OperationSubscriber<S, I, NowFn>
67
where
68
    OperationSubscriberState<
69
        <S::SubscriptionManager as SchedulerSubscriptionManager>::Subscription,
70
    >: core::fmt::Debug,
71
{
72
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
73
0
        f.debug_struct("OperationSubscriber")
74
0
            .field("maybe_client_operation_id", &self.maybe_client_operation_id)
75
0
            .field("subscription_key", &self.subscription_key)
76
0
            .field("weak_store", &self.weak_store)
77
0
            .field("state", &self.state)
78
0
            .field("last_known_keepalive_ts", &self.last_known_keepalive_ts)
79
0
            .field("now_fn", &self.now_fn)
80
0
            .finish()
81
0
    }
82
}
83
impl<S, I, NowFn> OperationSubscriber<S, I, NowFn>
84
where
85
    S: SchedulerStore,
86
    I: InstantWrapper,
87
    NowFn: Fn() -> I,
88
{
89
5
    const fn new(
90
5
        maybe_client_operation_id: Option<ClientOperationId>,
91
5
        subscription_key: OperationIdToAwaitedAction<'static>,
92
5
        weak_store: Weak<S>,
93
5
        now_fn: NowFn,
94
5
    ) -> Self {
95
5
        Self {
96
5
            maybe_client_operation_id,
97
5
            subscription_key,
98
5
            weak_store,
99
5
            last_known_keepalive_ts: AtomicU64::new(0),
100
5
            state: OperationSubscriberState::Unsubscribed,
101
5
            now_fn,
102
5
        }
103
5
    }
104
105
4
    async fn inner_get_awaited_action(
106
4
        store: &S,
107
4
        key: OperationIdToAwaitedAction<'_>,
108
4
        maybe_client_operation_id: Option<ClientOperationId>,
109
4
        last_known_keepalive_ts: &AtomicU64,
110
4
    ) -> Result<AwaitedAction, Error> {
111
4
        let mut awaited_action = store
112
4
            .get_and_decode(key.borrow())
113
4
            .await
114
4
            .err_tip(|| format!(
"In OperationSubscriber::get_awaited_action {key:?}"0
))
?0
115
4
            .ok_or_else(|| 
{0
116
0
                make_err!(
117
0
                    Code::NotFound,
118
                    "Could not find AwaitedAction for the given operation id {key:?}",
119
                )
120
0
            })?;
121
4
        if let Some(client_operation_id) = maybe_client_operation_id {
  Branch (121:16): [True: 0, False: 0]
  Branch (121:16): [Folded - Ignored]
  Branch (121:16): [True: 4, False: 0]
122
4
            awaited_action.set_client_operation_id(client_operation_id);
123
4
        
}0
124
4
        last_known_keepalive_ts.store(
125
4
            awaited_action
126
4
                .last_client_keepalive_timestamp()
127
4
                .unix_timestamp(),
128
4
            Ordering::Release,
129
        );
130
4
        Ok(awaited_action)
131
4
    }
132
133
    #[expect(clippy::future_not_send)] // TODO(jhpratt) remove this
134
2
    async fn get_awaited_action(&self) -> Result<AwaitedAction, Error> {
135
2
        let store = self
136
2
            .weak_store
137
2
            .upgrade()
138
2
            .err_tip(|| "Store gone in OperationSubscriber::get_awaited_action")
?0
;
139
2
        Self::inner_get_awaited_action(
140
2
            store.as_ref(),
141
2
            self.subscription_key.borrow(),
142
2
            self.maybe_client_operation_id.clone(),
143
2
            &self.last_known_keepalive_ts,
144
2
        )
145
2
        .await
146
2
    }
147
}
148
149
impl<S, I, NowFn> AwaitedActionSubscriber for OperationSubscriber<S, I, NowFn>
150
where
151
    S: SchedulerStore,
152
    I: InstantWrapper,
153
    NowFn: Fn() -> I + Send + Sync + 'static,
154
{
155
2
    async fn changed(&mut self) -> Result<AwaitedAction, Error> {
156
2
        let store = self
157
2
            .weak_store
158
2
            .upgrade()
159
2
            .err_tip(|| "Store gone in OperationSubscriber::get_awaited_action")
?0
;
160
2
        let subscription = match &mut self.state {
161
            OperationSubscriberState::Unsubscribed => {
162
1
                let subscription = store
163
1
                    .subscription_manager()
164
1
                    .err_tip(|| "In OperationSubscriber::changed::subscription_manager")
?0
165
1
                    .subscribe(self.subscription_key.borrow())
166
1
                    .err_tip(|| "In OperationSubscriber::changed::subscribe")
?0
;
167
1
                self.state = OperationSubscriberState::Subscribed(subscription);
168
1
                let OperationSubscriberState::Subscribed(subscription) = &mut self.state else {
  Branch (168:21): [True: 0, False: 0]
  Branch (168:21): [Folded - Ignored]
  Branch (168:21): [True: 1, False: 0]
169
0
                    unreachable!("Subscription should be in Subscribed state");
170
                };
171
1
                subscription
172
            }
173
1
            OperationSubscriberState::Subscribed(subscription) => subscription,
174
        };
175
176
2
        let changed_fut = subscription.changed();
177
2
        tokio::pin!(changed_fut);
178
        loop {
179
2
            let mut retries = 0;
180
            loop {
181
2
                let last_known_keepalive_ts = self.last_known_keepalive_ts.load(Ordering::Acquire);
182
2
                if I::from_secs(last_known_keepalive_ts).elapsed() <= CLIENT_KEEPALIVE_DURATION {
  Branch (182:20): [True: 0, False: 0]
  Branch (182:20): [Folded - Ignored]
  Branch (182:20): [True: 2, False: 0]
183
2
                    break; // We are still within the keep alive duration.
184
0
                }
185
0
                if retries > MAX_RETRIES_FOR_CLIENT_KEEPALIVE {
  Branch (185:20): [True: 0, False: 0]
  Branch (185:20): [Folded - Ignored]
  Branch (185:20): [True: 0, False: 0]
186
0
                    return Err(make_err!(
187
0
                        Code::Aborted,
188
0
                        "Could not update client keep alive for AwaitedAction",
189
0
                    ));
190
0
                }
191
0
                let mut awaited_action = Self::inner_get_awaited_action(
192
0
                    store.as_ref(),
193
0
                    self.subscription_key.borrow(),
194
0
                    self.maybe_client_operation_id.clone(),
195
0
                    &self.last_known_keepalive_ts,
196
0
                )
197
0
                .await
198
0
                .err_tip(|| "In OperationSubscriber::changed")?;
199
0
                awaited_action.update_client_keep_alive((self.now_fn)().now());
200
0
                let update_res = inner_update_awaited_action(store.as_ref(), awaited_action)
201
0
                    .await
202
0
                    .err_tip(|| "In OperationSubscriber::changed");
203
0
                if update_res.is_ok() {
  Branch (203:20): [True: 0, False: 0]
  Branch (203:20): [Folded - Ignored]
  Branch (203:20): [True: 0, False: 0]
204
0
                    break;
205
0
                }
206
0
                retries += 1;
207
                // Wait a tick before retrying.
208
0
                (self.now_fn)().sleep(Duration::from_millis(100)).await;
209
            }
210
2
            let sleep_fut = (self.now_fn)().sleep(CLIENT_KEEPALIVE_DURATION);
211
2
            tokio::select! {
212
2
                result = &mut changed_fut => {
213
2
                    result
?0
;
214
2
                    break;
215
                }
216
2
                () = sleep_fut => {
217
0
                    // If we haven't received any updates for a while, we should
218
0
                    // let the database know that we are still listening to prevent
219
0
                    // the action from being dropped.
220
0
                }
221
            }
222
        }
223
224
2
        Self::inner_get_awaited_action(
225
2
            store.as_ref(),
226
2
            self.subscription_key.borrow(),
227
2
            self.maybe_client_operation_id.clone(),
228
2
            &self.last_known_keepalive_ts,
229
2
        )
230
2
        .await
231
2
        .err_tip(|| "In OperationSubscriber::changed")
232
2
    }
233
234
2
    async fn borrow(&self) -> Result<AwaitedAction, Error> {
235
2
        self.get_awaited_action()
236
2
            .await
237
2
            .err_tip(|| "In OperationSubscriber::borrow")
238
2
    }
239
}
240
241
5
fn awaited_action_decode(version: u64, data: &Bytes) -> Result<AwaitedAction, Error> {
242
5
    let mut awaited_action: AwaitedAction = serde_json::from_slice(data)
243
5
        .map_err(|e| make_input_err!("In AwaitedAction::decode - {e:?}"))
?0
;
244
5
    awaited_action.set_version(version);
245
5
    Ok(awaited_action)
246
5
}
247
248
const OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX: &str = "aa_";
249
const CLIENT_ID_TO_OPERATION_ID_KEY_PREFIX: &str = "cid_";
250
251
#[derive(Debug)]
252
struct OperationIdToAwaitedAction<'a>(Cow<'a, OperationId>);
253
impl OperationIdToAwaitedAction<'_> {
254
9
    fn borrow(&self) -> OperationIdToAwaitedAction<'_> {
255
9
        OperationIdToAwaitedAction(Cow::Borrowed(self.0.as_ref()))
256
9
    }
257
}
258
impl SchedulerStoreKeyProvider for OperationIdToAwaitedAction<'_> {
259
    type Versioned = TrueValue;
260
11
    fn get_key(&self) -> StoreKey<'static> {
261
11
        StoreKey::Str(Cow::Owned(format!(
262
11
            "{OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX}{}",
263
11
            self.0
264
11
        )))
265
11
    }
266
}
267
impl SchedulerStoreDecodeTo for OperationIdToAwaitedAction<'_> {
268
    type DecodeOutput = AwaitedAction;
269
4
    fn decode(version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error> {
270
4
        awaited_action_decode(version, &data)
271
4
    }
272
}
273
274
struct ClientIdToOperationId<'a>(&'a OperationId);
275
impl SchedulerStoreKeyProvider for ClientIdToOperationId<'_> {
276
    type Versioned = FalseValue;
277
5
    fn get_key(&self) -> StoreKey<'static> {
278
5
        StoreKey::Str(Cow::Owned(format!(
279
5
            "{CLIENT_ID_TO_OPERATION_ID_KEY_PREFIX}{}",
280
5
            self.0
281
5
        )))
282
5
    }
283
}
284
impl SchedulerStoreDecodeTo for ClientIdToOperationId<'_> {
285
    type DecodeOutput = OperationId;
286
2
    fn decode(_version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error> {
287
2
        serde_json::from_slice(&data).map_err(|e| 
{0
288
0
            make_input_err!(
289
                "In ClientIdToOperationId::decode - {e:?} (data: {:02x?})",
290
                data
291
            )
292
0
        })
293
2
    }
294
}
295
296
// TODO(aaronmondal) We only need operation_id here, it would be nice if we had a way
297
// to tell the decoder we only care about specific fields.
298
struct SearchUniqueQualifierToAwaitedAction<'a>(&'a ActionUniqueQualifier);
299
impl SchedulerIndexProvider for SearchUniqueQualifierToAwaitedAction<'_> {
300
    const KEY_PREFIX: &'static str = OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX;
301
    const INDEX_NAME: &'static str = "unique_qualifier";
302
    type Versioned = TrueValue;
303
3
    fn index_value(&self) -> Cow<'_, str> {
304
3
        Cow::Owned(format!("{}", self.0))
305
3
    }
306
}
307
impl SchedulerStoreDecodeTo for SearchUniqueQualifierToAwaitedAction<'_> {
308
    type DecodeOutput = AwaitedAction;
309
1
    fn decode(version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error> {
310
1
        awaited_action_decode(version, &data)
311
1
    }
312
}
313
314
struct SearchStateToAwaitedAction(&'static str);
315
impl SchedulerIndexProvider for SearchStateToAwaitedAction {
316
    const KEY_PREFIX: &'static str = OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX;
317
    const INDEX_NAME: &'static str = "state";
318
    const MAYBE_SORT_KEY: Option<&'static str> = Some("sort_key");
319
    type Versioned = TrueValue;
320
0
    fn index_value(&self) -> Cow<'_, str> {
321
0
        Cow::Borrowed(self.0)
322
0
    }
323
}
324
impl SchedulerStoreDecodeTo for SearchStateToAwaitedAction {
325
    type DecodeOutput = AwaitedAction;
326
0
    fn decode(version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error> {
327
0
        awaited_action_decode(version, &data)
328
0
    }
329
}
330
331
4
const fn get_state_prefix(state: SortedAwaitedActionState) -> &'static str {
332
4
    match state {
333
0
        SortedAwaitedActionState::CacheCheck => "cache_check",
334
3
        SortedAwaitedActionState::Queued => "queued",
335
1
        SortedAwaitedActionState::Executing => "executing",
336
0
        SortedAwaitedActionState::Completed => "completed",
337
    }
338
4
}
339
340
struct UpdateOperationIdToAwaitedAction(AwaitedAction);
341
impl SchedulerCurrentVersionProvider for UpdateOperationIdToAwaitedAction {
342
4
    fn current_version(&self) -> u64 {
343
4
        self.0.version()
344
4
    }
345
}
346
impl SchedulerStoreKeyProvider for UpdateOperationIdToAwaitedAction {
347
    type Versioned = TrueValue;
348
4
    fn get_key(&self) -> StoreKey<'static> {
349
4
        OperationIdToAwaitedAction(Cow::Borrowed(self.0.operation_id())).get_key()
350
4
    }
351
}
352
impl SchedulerStoreDataProvider for UpdateOperationIdToAwaitedAction {
353
4
    fn try_into_bytes(self) -> Result<Bytes, Error> {
354
4
        serde_json::to_string(&self.0)
355
4
            .map(Bytes::from)
356
4
            .map_err(|e| make_input_err!("Could not convert AwaitedAction to json - {e:?}"))
357
4
    }
358
4
    fn get_indexes(&self) -> Result<Vec<(&'static str, Bytes)>, Error> {
359
4
        let unique_qualifier = &self.0.action_info().unique_qualifier;
360
4
        let maybe_unique_qualifier = match &unique_qualifier {
361
4
            ActionUniqueQualifier::Cacheable(_) => Some(unique_qualifier),
362
0
            ActionUniqueQualifier::Uncacheable(_) => None,
363
        };
364
4
        let mut output = Vec::with_capacity(2 + maybe_unique_qualifier.map_or(0, |_| 1));
365
4
        if maybe_unique_qualifier.is_some() {
  Branch (365:12): [True: 4, False: 0]
  Branch (365:12): [Folded - Ignored]
366
4
            output.push((
367
4
                "unique_qualifier",
368
4
                Bytes::from(unique_qualifier.to_string()),
369
4
            ));
370
4
        
}0
371
        {
372
4
            let state = SortedAwaitedActionState::try_from(&self.0.state().stage)
373
4
                .err_tip(|| "In UpdateOperationIdToAwaitedAction::get_index")
?0
;
374
4
            output.push(("state", Bytes::from(get_state_prefix(state))));
375
4
            let sorted_awaited_action = SortedAwaitedAction::from(&self.0);
376
4
            output.push((
377
4
                "sort_key",
378
4
                // We encode to hex to ensure that the sort key is lexicographically sorted.
379
4
                Bytes::from(format!("{:016x}", sorted_awaited_action.sort_key.as_u64())),
380
4
            ));
381
        }
382
4
        Ok(output)
383
4
    }
384
}
385
386
struct UpdateClientIdToOperationId {
387
    client_operation_id: ClientOperationId,
388
    operation_id: OperationId,
389
}
390
impl SchedulerStoreKeyProvider for UpdateClientIdToOperationId {
391
    type Versioned = FalseValue;
392
3
    fn get_key(&self) -> StoreKey<'static> {
393
3
        ClientIdToOperationId(&self.client_operation_id).get_key()
394
3
    }
395
}
396
impl SchedulerStoreDataProvider for UpdateClientIdToOperationId {
397
3
    fn try_into_bytes(self) -> Result<Bytes, Error> {
398
3
        serde_json::to_string(&self.operation_id)
399
3
            .map(Bytes::from)
400
3
            .map_err(|e| make_input_err!("Could not convert OperationId to json - {e:?}"))
401
3
    }
402
}
403
404
2
async fn inner_update_awaited_action(
405
2
    store: &impl SchedulerStore,
406
2
    mut new_awaited_action: AwaitedAction,
407
2
) -> Result<(), Error> {
408
2
    let operation_id = new_awaited_action.operation_id().clone();
409
2
    if new_awaited_action.state().client_operation_id != operation_id {
  Branch (409:8): [True: 0, False: 0]
  Branch (409:8): [Folded - Ignored]
  Branch (409:8): [True: 0, False: 2]
410
0
        // Just in case the client_operation_id was set to something else
411
0
        // we put it back to the underlying operation_id.
412
0
        new_awaited_action.set_client_operation_id(operation_id.clone());
413
2
    }
414
2
    let maybe_version = store
415
2
        .update_data(UpdateOperationIdToAwaitedAction(new_awaited_action))
416
2
        .await
417
2
        .err_tip(|| "In RedisAwaitedActionDb::update_awaited_action")
?0
;
418
2
    if maybe_version.is_none() {
  Branch (418:8): [True: 0, False: 0]
  Branch (418:8): [Folded - Ignored]
  Branch (418:8): [True: 0, False: 2]
419
0
        return Err(make_err!(
420
0
            Code::Aborted,
421
0
            "Could not update AwaitedAction because the version did not match for {operation_id:?}",
422
0
        ));
423
2
    }
424
2
    Ok(())
425
2
}
426
427
#[derive(Debug, MetricsComponent)]
428
pub struct StoreAwaitedActionDb<S, F, I, NowFn>
429
where
430
    S: SchedulerStore,
431
    F: Fn() -> OperationId,
432
    I: InstantWrapper,
433
    NowFn: Fn() -> I,
434
{
435
    store: Arc<S>,
436
    now_fn: NowFn,
437
    operation_id_creator: F,
438
    _pull_task_change_subscriber_spawn: JoinHandleDropGuard<()>,
439
}
440
441
impl<S, F, I, NowFn> StoreAwaitedActionDb<S, F, I, NowFn>
442
where
443
    S: SchedulerStore,
444
    F: Fn() -> OperationId,
445
    I: InstantWrapper,
446
    NowFn: Fn() -> I + Send + Sync + Clone + 'static,
447
{
448
2
    pub fn new(
449
2
        store: Arc<S>,
450
2
        task_change_publisher: Arc<Notify>,
451
2
        now_fn: NowFn,
452
2
        operation_id_creator: F,
453
2
    ) -> Result<Self, Error> {
454
2
        let mut subscription = store
455
2
            .subscription_manager()
456
2
            .err_tip(|| "In RedisAwaitedActionDb::new")
?0
457
2
            .subscribe(OperationIdToAwaitedAction(Cow::Owned(OperationId::String(
458
2
                String::new(),
459
2
            ))))
460
2
            .err_tip(|| "In RedisAwaitedActionDb::new")
?0
;
461
2
        let pull_task_change_subscriber = spawn!(
462
            "redis_awaited_action_db_pull_task_change_subscriber",
463
2
            async move {
464
                loop {
465
11
                    let 
changed_res9
= subscription
466
11
                        .changed()
467
11
                        .await
468
9
                        .err_tip(|| "In RedisAwaitedActionDb::new");
469
9
                    if let Err(
err0
) = changed_res {
  Branch (469:28): [True: 0, False: 0]
  Branch (469:28): [Folded - Ignored]
  Branch (469:28): [True: 0, False: 4]
  Branch (469:28): [True: 0, False: 5]
470
0
                        error!(
471
0
                            "Error waiting for pull task change subscriber in RedisAwaitedActionDb::new  - {err:?}"
472
                        );
473
                        // Sleep for a second to avoid a busy loop, then trigger the notify
474
                        // so if a reconnect happens we let local resources know that things
475
                        // might have changed.
476
0
                        tokio::time::sleep(Duration::from_secs(1)).await;
477
9
                    }
478
9
                    task_change_publisher.as_ref().notify_one();
479
                }
480
            }
481
        );
482
2
        Ok(Self {
483
2
            store,
484
2
            now_fn,
485
2
            operation_id_creator,
486
2
            _pull_task_change_subscriber_spawn: pull_task_change_subscriber,
487
2
        })
488
2
    }
489
490
    #[expect(clippy::future_not_send)] // TODO(jhpratt) remove this
491
3
    async fn try_subscribe(
492
3
        &self,
493
3
        client_operation_id: &ClientOperationId,
494
3
        unique_qualifier: &ActionUniqueQualifier,
495
3
        // TODO(aaronmondal) To simplify the scheduler 2024 refactor, we
496
3
        // removed the ability to upgrade priorities of actions.
497
3
        // we should add priority upgrades back in.
498
3
        _priority: i32,
499
3
    ) -> Result<Option<OperationSubscriber<S, I, NowFn>>, Error> {
500
3
        match unique_qualifier {
501
3
            ActionUniqueQualifier::Cacheable(_) => {}
502
0
            ActionUniqueQualifier::Uncacheable(_) => return Ok(None),
503
        }
504
3
        let stream = self
505
3
            .store
506
3
            .search_by_index_prefix(SearchUniqueQualifierToAwaitedAction(unique_qualifier))
507
3
            .await
508
3
            .err_tip(|| "In RedisAwaitedActionDb::try_subscribe")
?0
;
509
3
        tokio::pin!(stream);
510
3
        let maybe_awaited_action = stream
511
3
            .try_next()
512
3
            .await
513
3
            .err_tip(|| "In RedisAwaitedActionDb::try_subscribe")
?0
;
514
3
        match maybe_awaited_action {
515
1
            Some(mut awaited_action) => {
516
                // TODO(aaronmondal) We don't support joining completed jobs because we
517
                // need to also check that all the data is still in the cache.
518
1
                if awaited_action.state().stage.is_finished() {
  Branch (518:20): [True: 0, False: 0]
  Branch (518:20): [Folded - Ignored]
  Branch (518:20): [True: 0, False: 0]
  Branch (518:20): [True: 0, False: 1]
519
0
                    return Ok(None);
520
1
                }
521
                // TODO(aaronmondal) We only care about the operation_id here, we should
522
                // have a way to tell the decoder we only care about specific fields.
523
1
                let operation_id = awaited_action.operation_id().clone();
524
525
1
                awaited_action.update_client_keep_alive((self.now_fn)().now());
526
1
                let update_res = inner_update_awaited_action(self.store.as_ref(), awaited_action)
527
1
                    .await
528
1
                    .err_tip(|| "In OperationSubscriber::changed");
529
1
                if let Err(
err0
) = update_res {
  Branch (529:24): [True: 0, False: 0]
  Branch (529:24): [Folded - Ignored]
  Branch (529:24): [True: 0, False: 0]
  Branch (529:24): [True: 0, False: 1]
530
0
                    warn!(
531
0
                        "Error updating client keep alive in RedisAwaitedActionDb::try_subscribe - {err:?} - This is not a critical error, but we did decide to create a new action instead of joining an existing one."
532
                    );
533
0
                    return Ok(None);
534
1
                }
535
536
                // Add the client_operation_id to operation_id mapping
537
1
                self.store
538
1
                    .update_data(UpdateClientIdToOperationId {
539
1
                        client_operation_id: client_operation_id.clone(),
540
1
                        operation_id: operation_id.clone(),
541
1
                    })
542
1
                    .await
543
1
                    .err_tip(
544
                        || "In RedisAwaitedActionDb::try_subscribe while adding client mapping",
545
0
                    )?;
546
547
1
                Ok(Some(OperationSubscriber::new(
548
1
                    Some(client_operation_id.clone()),
549
1
                    OperationIdToAwaitedAction(Cow::Owned(operation_id)),
550
1
                    Arc::downgrade(&self.store),
551
1
                    self.now_fn.clone(),
552
1
                )))
553
            }
554
2
            None => Ok(None),
555
        }
556
3
    }
557
558
    #[expect(clippy::future_not_send)] // TODO(jhpratt) remove this
559
2
    async fn inner_get_awaited_action_by_id(
560
2
        &self,
561
2
        client_operation_id: &ClientOperationId,
562
2
    ) -> Result<Option<OperationSubscriber<S, I, NowFn>>, Error> {
563
2
        let maybe_operation_id = self
564
2
            .store
565
2
            .get_and_decode(ClientIdToOperationId(client_operation_id))
566
2
            .await
567
2
            .err_tip(|| "In RedisAwaitedActionDb::get_awaited_action_by_id")
?0
;
568
2
        let Some(operation_id) = maybe_operation_id else {
  Branch (568:13): [True: 0, False: 0]
  Branch (568:13): [Folded - Ignored]
  Branch (568:13): [True: 1, False: 0]
  Branch (568:13): [True: 1, False: 0]
569
0
            return Ok(None);
570
        };
571
2
        Ok(Some(OperationSubscriber::new(
572
2
            Some(client_operation_id.clone()),
573
2
            OperationIdToAwaitedAction(Cow::Owned(operation_id)),
574
2
            Arc::downgrade(&self.store),
575
2
            self.now_fn.clone(),
576
2
        )))
577
2
    }
578
}
579
580
impl<S, F, I, NowFn> AwaitedActionDb for StoreAwaitedActionDb<S, F, I, NowFn>
581
where
582
    S: SchedulerStore,
583
    F: Fn() -> OperationId + Send + Sync + Unpin + 'static,
584
    I: InstantWrapper,
585
    NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static,
586
{
587
    type Subscriber = OperationSubscriber<S, I, NowFn>;
588
589
2
    async fn get_awaited_action_by_id(
590
2
        &self,
591
2
        client_operation_id: &ClientOperationId,
592
2
    ) -> Result<Option<Self::Subscriber>, Error> {
593
2
        self.inner_get_awaited_action_by_id(client_operation_id)
594
2
            .await
595
2
    }
596
597
0
    async fn get_by_operation_id(
598
0
        &self,
599
0
        operation_id: &OperationId,
600
0
    ) -> Result<Option<Self::Subscriber>, Error> {
601
0
        Ok(Some(OperationSubscriber::new(
602
0
            None,
603
0
            OperationIdToAwaitedAction(Cow::Owned(operation_id.clone())),
604
0
            Arc::downgrade(&self.store),
605
0
            self.now_fn.clone(),
606
0
        )))
607
0
    }
608
609
1
    async fn update_awaited_action(&self, new_awaited_action: AwaitedAction) -> Result<(), Error> {
610
1
        inner_update_awaited_action(self.store.as_ref(), new_awaited_action).await
611
1
    }
612
613
3
    async fn add_action(
614
3
        &self,
615
3
        client_operation_id: ClientOperationId,
616
3
        action_info: Arc<ActionInfo>,
617
3
    ) -> Result<Self::Subscriber, Error> {
618
        // Check to see if the action is already known and subscribe if it is.
619
3
        let subscription = self
620
3
            .try_subscribe(
621
3
                &client_operation_id,
622
3
                &action_info.unique_qualifier,
623
3
                action_info.priority,
624
3
            )
625
3
            .await
626
3
            .err_tip(|| "In RedisAwaitedActionDb::add_action")
?0
;
627
3
        if let Some(
sub1
) = subscription {
  Branch (627:16): [True: 0, False: 0]
  Branch (627:16): [Folded - Ignored]
  Branch (627:16): [True: 0, False: 1]
  Branch (627:16): [True: 1, False: 1]
628
1
            return Ok(sub);
629
2
        }
630
631
2
        let new_operation_id = (self.operation_id_creator)();
632
2
        let awaited_action =
633
2
            AwaitedAction::new(new_operation_id.clone(), action_info, (self.now_fn)().now());
634
2
        debug_assert!(
635
0
            ActionStage::Queued == awaited_action.state().stage,
636
0
            "Expected action to be queued"
637
        );
638
639
        // Note: Version is not needed with this api.
640
2
        let _version = self
641
2
            .store
642
2
            .update_data(UpdateOperationIdToAwaitedAction(awaited_action))
643
2
            .await
644
2
            .err_tip(|| "In RedisAwaitedActionDb::add_action")
?0
645
2
            .err_tip(
646
                || "Version match failed for new action insert in RedisAwaitedActionDb::add_action",
647
0
            )?;
648
649
2
        self.store
650
2
            .update_data(UpdateClientIdToOperationId {
651
2
                client_operation_id: client_operation_id.clone(),
652
2
                operation_id: new_operation_id.clone(),
653
2
            })
654
2
            .await
655
2
            .err_tip(|| "In RedisAwaitedActionDb::add_action")
?0
;
656
657
2
        Ok(OperationSubscriber::new(
658
2
            Some(client_operation_id),
659
2
            OperationIdToAwaitedAction(Cow::Owned(new_operation_id)),
660
2
            Arc::downgrade(&self.store),
661
2
            self.now_fn.clone(),
662
2
        ))
663
3
    }
664
665
0
    async fn get_range_of_actions(
666
0
        &self,
667
0
        state: SortedAwaitedActionState,
668
0
        start: Bound<SortedAwaitedAction>,
669
0
        end: Bound<SortedAwaitedAction>,
670
0
        desc: bool,
671
0
    ) -> Result<impl Stream<Item = Result<Self::Subscriber, Error>> + Send, Error> {
672
0
        if !matches!(start, Bound::Unbounded) {
  Branch (672:12): [True: 0, False: 0]
  Branch (672:12): [Folded - Ignored]
673
0
            return Err(make_err!(
674
0
                Code::Unimplemented,
675
0
                "Start bound is not supported in RedisAwaitedActionDb::get_range_of_actions",
676
0
            ));
677
0
        }
678
0
        if !matches!(end, Bound::Unbounded) {
  Branch (678:12): [True: 0, False: 0]
  Branch (678:12): [Folded - Ignored]
679
0
            return Err(make_err!(
680
0
                Code::Unimplemented,
681
0
                "Start bound is not supported in RedisAwaitedActionDb::get_range_of_actions",
682
0
            ));
683
0
        }
684
        // TODO(aaronmondal) This API is not difficult to implement, but there is no code path
685
        // that uses it, so no reason to implement it yet.
686
0
        if !desc {
  Branch (686:12): [True: 0, False: 0]
  Branch (686:12): [Folded - Ignored]
687
0
            return Err(make_err!(
688
0
                Code::Unimplemented,
689
0
                "Descending order is not supported in RedisAwaitedActionDb::get_range_of_actions",
690
0
            ));
691
0
        }
692
0
        Ok(self
693
0
            .store
694
0
            .search_by_index_prefix(SearchStateToAwaitedAction(get_state_prefix(state)))
695
0
            .await
696
0
            .err_tip(|| "In RedisAwaitedActionDb::get_range_of_actions")?
697
0
            .map_ok(move |awaited_action| {
698
0
                OperationSubscriber::new(
699
0
                    None,
700
0
                    OperationIdToAwaitedAction(Cow::Owned(awaited_action.operation_id().clone())),
701
0
                    Arc::downgrade(&self.store),
702
0
                    self.now_fn.clone(),
703
                )
704
0
            }))
705
0
    }
706
707
0
    async fn get_all_awaited_actions(
708
0
        &self,
709
0
    ) -> Result<impl Stream<Item = Result<Self::Subscriber, Error>>, Error> {
710
0
        Ok(self
711
0
            .store
712
0
            .search_by_index_prefix(SearchStateToAwaitedAction(""))
713
0
            .await
714
0
            .err_tip(|| "In RedisAwaitedActionDb::get_range_of_actions")?
715
0
            .map_ok(move |awaited_action| {
716
0
                OperationSubscriber::new(
717
0
                    None,
718
0
                    OperationIdToAwaitedAction(Cow::Owned(awaited_action.operation_id().clone())),
719
0
                    Arc::downgrade(&self.store),
720
0
                    self.now_fn.clone(),
721
                )
722
0
            }))
723
0
    }
724
}