Coverage Report

Created: 2025-04-19 16:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/store_awaited_action_db.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::ops::Bound;
17
use std::sync::atomic::{AtomicU64, Ordering};
18
use std::sync::{Arc, Weak};
19
use std::time::Duration;
20
21
use bytes::Bytes;
22
use futures::{Stream, TryStreamExt};
23
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
24
use nativelink_metric::MetricsComponent;
25
use nativelink_util::action_messages::{
26
    ActionInfo, ActionStage, ActionUniqueQualifier, OperationId,
27
};
28
use nativelink_util::instant_wrapper::InstantWrapper;
29
use nativelink_util::spawn;
30
use nativelink_util::store_trait::{
31
    FalseValue, SchedulerCurrentVersionProvider, SchedulerIndexProvider, SchedulerStore,
32
    SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,
33
    SchedulerSubscription, SchedulerSubscriptionManager, StoreKey, TrueValue,
34
};
35
use nativelink_util::task::JoinHandleDropGuard;
36
use tokio::sync::Notify;
37
use tracing::{Level, event};
38
39
use crate::awaited_action_db::{
40
    AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, CLIENT_KEEPALIVE_DURATION,
41
    SortedAwaitedAction, SortedAwaitedActionState,
42
};
43
44
type ClientOperationId = OperationId;
45
46
/// Maximum number of retries to update client keep alive.
47
const MAX_RETRIES_FOR_CLIENT_KEEPALIVE: u32 = 8;
48
49
enum OperationSubscriberState<Sub> {
50
    Unsubscribed,
51
    Subscribed(Sub),
52
}
53
54
pub struct OperationSubscriber<S: SchedulerStore, I: InstantWrapper, NowFn: Fn() -> I> {
55
    maybe_client_operation_id: Option<ClientOperationId>,
56
    subscription_key: OperationIdToAwaitedAction<'static>,
57
    weak_store: Weak<S>,
58
    state: OperationSubscriberState<
59
        <S::SubscriptionManager as SchedulerSubscriptionManager>::Subscription,
60
    >,
61
    last_known_keepalive_ts: AtomicU64,
62
    now_fn: NowFn,
63
}
64
65
impl<S: SchedulerStore, I: InstantWrapper, NowFn: Fn() -> I + std::fmt::Debug> std::fmt::Debug
66
    for OperationSubscriber<S, I, NowFn>
67
where
68
    OperationSubscriberState<
69
        <S::SubscriptionManager as SchedulerSubscriptionManager>::Subscription,
70
    >: std::fmt::Debug,
71
{
72
0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73
0
        f.debug_struct("OperationSubscriber")
74
0
            .field("maybe_client_operation_id", &self.maybe_client_operation_id)
75
0
            .field("subscription_key", &self.subscription_key)
76
0
            .field("weak_store", &self.weak_store)
77
0
            .field("state", &self.state)
78
0
            .field("last_known_keepalive_ts", &self.last_known_keepalive_ts)
79
0
            .field("now_fn", &self.now_fn)
80
0
            .finish()
81
0
    }
82
}
83
impl<S, I, NowFn> OperationSubscriber<S, I, NowFn>
84
where
85
    S: SchedulerStore,
86
    I: InstantWrapper,
87
    NowFn: Fn() -> I,
88
{
89
1
    const fn new(
90
1
        maybe_client_operation_id: Option<ClientOperationId>,
91
1
        subscription_key: OperationIdToAwaitedAction<'static>,
92
1
        weak_store: Weak<S>,
93
1
        now_fn: NowFn,
94
1
    ) -> Self {
95
1
        Self {
96
1
            maybe_client_operation_id,
97
1
            subscription_key,
98
1
            weak_store,
99
1
            last_known_keepalive_ts: AtomicU64::new(0),
100
1
            state: OperationSubscriberState::Unsubscribed,
101
1
            now_fn,
102
1
        }
103
1
    }
104
105
2
    async fn inner_get_awaited_action(
106
2
        store: &S,
107
2
        key: OperationIdToAwaitedAction<'_>,
108
2
        maybe_client_operation_id: Option<ClientOperationId>,
109
2
        last_known_keepalive_ts: &AtomicU64,
110
2
    ) -> Result<AwaitedAction, Error> {
111
2
        let mut awaited_action = store
112
2
            .get_and_decode(key.borrow())
113
2
            .await
114
2
            .err_tip(|| 
format!("In OperationSubscriber::get_awaited_action {key:?}")0
)
?0
115
2
            .ok_or_else(|| {
116
0
                make_err!(
117
0
                    Code::NotFound,
118
0
                    "Could not find AwaitedAction for the given operation id {key:?}",
119
0
                )
120
0
            })?;
121
2
        if let Some(client_operation_id) = maybe_client_operation_id {
  Branch (121:16): [True: 0, False: 0]
  Branch (121:16): [Folded - Ignored]
  Branch (121:16): [True: 2, False: 0]
122
2
            awaited_action.set_client_operation_id(client_operation_id);
123
2
        
}0
124
2
        last_known_keepalive_ts.store(
125
2
            awaited_action
126
2
                .last_client_keepalive_timestamp()
127
2
                .unix_timestamp(),
128
2
            Ordering::Release,
129
2
        );
130
2
        Ok(awaited_action)
131
2
    }
132
133
0
    async fn get_awaited_action(&self) -> Result<AwaitedAction, Error> {
134
0
        let store = self
135
0
            .weak_store
136
0
            .upgrade()
137
0
            .err_tip(|| "Store gone in OperationSubscriber::get_awaited_action")?;
138
0
        Self::inner_get_awaited_action(
139
0
            store.as_ref(),
140
0
            self.subscription_key.borrow(),
141
0
            self.maybe_client_operation_id.clone(),
142
0
            &self.last_known_keepalive_ts,
143
0
        )
144
0
        .await
145
0
    }
146
}
147
148
impl<S, I, NowFn> AwaitedActionSubscriber for OperationSubscriber<S, I, NowFn>
149
where
150
    S: SchedulerStore,
151
    I: InstantWrapper,
152
    NowFn: Fn() -> I + Send + Sync + 'static,
153
{
154
2
    async fn changed(&mut self) -> Result<AwaitedAction, Error> {
155
2
        let store = self
156
2
            .weak_store
157
2
            .upgrade()
158
2
            .err_tip(|| 
"Store gone in OperationSubscriber::get_awaited_action"0
)
?0
;
159
2
        let subscription = match &mut self.state {
160
            OperationSubscriberState::Unsubscribed => {
161
1
                let subscription = store
162
1
                    .subscription_manager()
163
1
                    .err_tip(|| 
"In OperationSubscriber::changed::subscription_manager"0
)
?0
164
1
                    .subscribe(self.subscription_key.borrow())
165
1
                    .err_tip(|| 
"In OperationSubscriber::changed::subscribe"0
)
?0
;
166
1
                self.state = OperationSubscriberState::Subscribed(subscription);
167
1
                let OperationSubscriberState::Subscribed(subscription) = &mut self.state else {
  Branch (167:21): [True: 0, False: 0]
  Branch (167:21): [Folded - Ignored]
  Branch (167:21): [True: 1, False: 0]
168
0
                    unreachable!("Subscription should be in Subscribed state");
169
                };
170
1
                subscription
171
            }
172
1
            OperationSubscriberState::Subscribed(subscription) => subscription,
173
        };
174
175
2
        let changed_fut = subscription.changed();
176
2
        tokio::pin!(changed_fut);
177
        loop {
178
2
            let mut retries = 0;
179
            loop {
180
2
                let last_known_keepalive_ts = self.last_known_keepalive_ts.load(Ordering::Acquire);
181
2
                if I::from_secs(last_known_keepalive_ts).elapsed() <= CLIENT_KEEPALIVE_DURATION {
  Branch (181:20): [True: 0, False: 0]
  Branch (181:20): [Folded - Ignored]
  Branch (181:20): [True: 2, False: 0]
182
2
                    break; // We are still within the keep alive duration.
183
0
                }
184
0
                if retries > MAX_RETRIES_FOR_CLIENT_KEEPALIVE {
  Branch (184:20): [True: 0, False: 0]
  Branch (184:20): [Folded - Ignored]
  Branch (184:20): [True: 0, False: 0]
185
0
                    return Err(make_err!(
186
0
                        Code::Aborted,
187
0
                        "Could not update client keep alive for AwaitedAction",
188
0
                    ));
189
0
                }
190
0
                let mut awaited_action = Self::inner_get_awaited_action(
191
0
                    store.as_ref(),
192
0
                    self.subscription_key.borrow(),
193
0
                    self.maybe_client_operation_id.clone(),
194
0
                    &self.last_known_keepalive_ts,
195
0
                )
196
0
                .await
197
0
                .err_tip(|| "In OperationSubscriber::changed")?;
198
0
                awaited_action.update_client_keep_alive((self.now_fn)().now());
199
0
                let update_res = inner_update_awaited_action(store.as_ref(), awaited_action)
200
0
                    .await
201
0
                    .err_tip(|| "In OperationSubscriber::changed");
202
0
                if update_res.is_ok() {
  Branch (202:20): [True: 0, False: 0]
  Branch (202:20): [Folded - Ignored]
  Branch (202:20): [True: 0, False: 0]
203
0
                    break;
204
0
                }
205
0
                retries += 1;
206
0
                // Wait a tick before retrying.
207
0
                (self.now_fn)().sleep(Duration::from_millis(100)).await;
208
            }
209
2
            let sleep_fut = (self.now_fn)().sleep(CLIENT_KEEPALIVE_DURATION);
210
2
            tokio::select! {
211
2
                result = &mut changed_fut => {
212
2
                    result
?0
;
213
2
                    break;
214
                }
215
2
                () = sleep_fut => {
216
0
                    // If we haven't received any updates for a while, we should
217
0
                    // let the database know that we are still listening to prevent
218
0
                    // the action from being dropped.
219
0
                }
220
            }
221
        }
222
223
2
        Self::inner_get_awaited_action(
224
2
            store.as_ref(),
225
2
            self.subscription_key.borrow(),
226
2
            self.maybe_client_operation_id.clone(),
227
2
            &self.last_known_keepalive_ts,
228
2
        )
229
2
        .await
230
2
        .err_tip(|| 
"In OperationSubscriber::changed"0
)
231
2
    }
232
233
0
    async fn borrow(&self) -> Result<AwaitedAction, Error> {
234
0
        self.get_awaited_action()
235
0
            .await
236
0
            .err_tip(|| "In OperationSubscriber::borrow")
237
0
    }
238
}
239
240
2
fn awaited_action_decode(version: u64, data: &Bytes) -> Result<AwaitedAction, Error> {
241
2
    let mut awaited_action: AwaitedAction = serde_json::from_slice(data)
242
2
        .map_err(|e| 
make_input_err!("In AwaitedAction::decode - {e:?}")0
)
?0
;
243
2
    awaited_action.set_version(version);
244
2
    Ok(awaited_action)
245
2
}
246
247
const OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX: &str = "aa_";
248
const CLIENT_ID_TO_OPERATION_ID_KEY_PREFIX: &str = "cid_";
249
250
#[derive(Debug)]
251
struct OperationIdToAwaitedAction<'a>(Cow<'a, OperationId>);
252
impl OperationIdToAwaitedAction<'_> {
253
5
    fn borrow(&self) -> OperationIdToAwaitedAction<'_> {
254
5
        OperationIdToAwaitedAction(Cow::Borrowed(self.0.as_ref()))
255
5
    }
256
}
257
impl SchedulerStoreKeyProvider for OperationIdToAwaitedAction<'_> {
258
    type Versioned = TrueValue;
259
6
    fn get_key(&self) -> StoreKey<'static> {
260
6
        StoreKey::Str(Cow::Owned(format!(
261
6
            "{OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX}{}",
262
6
            self.0
263
6
        )))
264
6
    }
265
}
266
impl SchedulerStoreDecodeTo for OperationIdToAwaitedAction<'_> {
267
    type DecodeOutput = AwaitedAction;
268
2
    fn decode(version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error> {
269
2
        awaited_action_decode(version, &data)
270
2
    }
271
}
272
273
struct ClientIdToOperationId<'a>(&'a OperationId);
274
impl SchedulerStoreKeyProvider for ClientIdToOperationId<'_> {
275
    type Versioned = FalseValue;
276
1
    fn get_key(&self) -> StoreKey<'static> {
277
1
        StoreKey::Str(Cow::Owned(format!(
278
1
            "{CLIENT_ID_TO_OPERATION_ID_KEY_PREFIX}{}",
279
1
            self.0
280
1
        )))
281
1
    }
282
}
283
impl SchedulerStoreDecodeTo for ClientIdToOperationId<'_> {
284
    type DecodeOutput = OperationId;
285
0
    fn decode(_version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error> {
286
0
        OperationId::try_from(data).err_tip(|| "In ClientIdToOperationId::decode")
287
0
    }
288
}
289
290
// TODO(allada) We only need operation_id here, it would be nice if we had a way
291
// to tell the decoder we only care about specific fields.
292
struct SearchUniqueQualifierToAwaitedAction<'a>(&'a ActionUniqueQualifier);
293
impl SchedulerIndexProvider for SearchUniqueQualifierToAwaitedAction<'_> {
294
    const KEY_PREFIX: &'static str = OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX;
295
    const INDEX_NAME: &'static str = "unique_qualifier";
296
    type Versioned = TrueValue;
297
1
    fn index_value(&self) -> Cow<'_, str> {
298
1
        Cow::Owned(format!("{}", self.0))
299
1
    }
300
}
301
impl SchedulerStoreDecodeTo for SearchUniqueQualifierToAwaitedAction<'_> {
302
    type DecodeOutput = AwaitedAction;
303
0
    fn decode(version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error> {
304
0
        awaited_action_decode(version, &data)
305
0
    }
306
}
307
308
struct SearchStateToAwaitedAction(&'static str);
309
impl SchedulerIndexProvider for SearchStateToAwaitedAction {
310
    const KEY_PREFIX: &'static str = OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX;
311
    const INDEX_NAME: &'static str = "state";
312
    const MAYBE_SORT_KEY: Option<&'static str> = Some("sort_key");
313
    type Versioned = TrueValue;
314
0
    fn index_value(&self) -> Cow<'_, str> {
315
0
        Cow::Borrowed(self.0)
316
0
    }
317
}
318
impl SchedulerStoreDecodeTo for SearchStateToAwaitedAction {
319
    type DecodeOutput = AwaitedAction;
320
0
    fn decode(version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error> {
321
0
        awaited_action_decode(version, &data)
322
0
    }
323
}
324
325
2
const fn get_state_prefix(state: SortedAwaitedActionState) -> &'static str {
326
2
    match state {
327
0
        SortedAwaitedActionState::CacheCheck => "cache_check",
328
1
        SortedAwaitedActionState::Queued => "queued",
329
1
        SortedAwaitedActionState::Executing => "executing",
330
0
        SortedAwaitedActionState::Completed => "completed",
331
    }
332
2
}
333
334
struct UpdateOperationIdToAwaitedAction(AwaitedAction);
335
impl SchedulerCurrentVersionProvider for UpdateOperationIdToAwaitedAction {
336
2
    fn current_version(&self) -> u64 {
337
2
        self.0.version()
338
2
    }
339
}
340
impl SchedulerStoreKeyProvider for UpdateOperationIdToAwaitedAction {
341
    type Versioned = TrueValue;
342
2
    fn get_key(&self) -> StoreKey<'static> {
343
2
        OperationIdToAwaitedAction(Cow::Borrowed(self.0.operation_id())).get_key()
344
2
    }
345
}
346
impl SchedulerStoreDataProvider for UpdateOperationIdToAwaitedAction {
347
2
    fn try_into_bytes(self) -> Result<Bytes, Error> {
348
2
        serde_json::to_string(&self.0)
349
2
            .map(Bytes::from)
350
2
            .map_err(|e| 
make_input_err!("Could not convert AwaitedAction to json - {e:?}")0
)
351
2
    }
352
2
    fn get_indexes(&self) -> Result<Vec<(&'static str, Bytes)>, Error> {
353
2
        let unique_qualifier = &self.0.action_info().unique_qualifier;
354
2
        let maybe_unique_qualifier = match &unique_qualifier {
355
2
            ActionUniqueQualifier::Cachable(_) => Some(unique_qualifier),
356
0
            ActionUniqueQualifier::Uncachable(_) => None,
357
        };
358
2
        let mut output = Vec::with_capacity(2 + maybe_unique_qualifier.map_or(0, |_| 1));
359
2
        if maybe_unique_qualifier.is_some() {
  Branch (359:12): [True: 2, False: 0]
  Branch (359:12): [Folded - Ignored]
360
2
            output.push((
361
2
                "unique_qualifier",
362
2
                Bytes::from(unique_qualifier.to_string()),
363
2
            ));
364
2
        
}0
365
        {
366
2
            let state = SortedAwaitedActionState::try_from(&self.0.state().stage)
367
2
                .err_tip(|| 
"In UpdateOperationIdToAwaitedAction::get_index"0
)
?0
;
368
2
            output.push(("state", Bytes::from(get_state_prefix(state))));
369
2
            let sorted_awaited_action = SortedAwaitedAction::from(&self.0);
370
2
            output.push((
371
2
                "sort_key",
372
2
                // We encode to hex to ensure that the sort key is lexicographically sorted.
373
2
                Bytes::from(format!("{:016x}", sorted_awaited_action.sort_key.as_u64())),
374
2
            ));
375
2
        }
376
2
        Ok(output)
377
2
    }
378
}
379
380
struct UpdateClientIdToOperationId {
381
    client_operation_id: ClientOperationId,
382
    operation_id: OperationId,
383
}
384
impl SchedulerStoreKeyProvider for UpdateClientIdToOperationId {
385
    type Versioned = FalseValue;
386
1
    fn get_key(&self) -> StoreKey<'static> {
387
1
        ClientIdToOperationId(&self.client_operation_id).get_key()
388
1
    }
389
}
390
impl SchedulerStoreDataProvider for UpdateClientIdToOperationId {
391
1
    fn try_into_bytes(self) -> Result<Bytes, Error> {
392
1
        serde_json::to_string(&self.operation_id)
393
1
            .map(Bytes::from)
394
1
            .map_err(|e| 
make_input_err!("Could not convert OperationId to json - {e:?}")0
)
395
1
    }
396
}
397
398
1
async fn inner_update_awaited_action(
399
1
    store: &impl SchedulerStore,
400
1
    mut new_awaited_action: AwaitedAction,
401
1
) -> Result<(), Error> {
402
1
    let operation_id = new_awaited_action.operation_id().clone();
403
1
    if new_awaited_action.state().client_operation_id != operation_id {
  Branch (403:8): [True: 0, False: 0]
  Branch (403:8): [Folded - Ignored]
  Branch (403:8): [True: 0, False: 1]
404
0
        // Just in case the client_operation_id was set to something else
405
0
        // we put it back to the underlying operation_id.
406
0
        new_awaited_action.set_client_operation_id(operation_id.clone());
407
1
    }
408
1
    let maybe_version = store
409
1
        .update_data(UpdateOperationIdToAwaitedAction(new_awaited_action))
410
1
        .await
411
1
        .err_tip(|| 
"In RedisAwaitedActionDb::update_awaited_action"0
)
?0
;
412
1
    if maybe_version.is_none() {
  Branch (412:8): [True: 0, False: 0]
  Branch (412:8): [Folded - Ignored]
  Branch (412:8): [True: 0, False: 1]
413
0
        return Err(make_err!(
414
0
            Code::Aborted,
415
0
            "Could not update AwaitedAction because the version did not match for {operation_id:?}",
416
0
        ));
417
1
    }
418
1
    Ok(())
419
1
}
420
421
0
#[derive(Debug, MetricsComponent)]
422
pub struct StoreAwaitedActionDb<S, F, I, NowFn>
423
where
424
    S: SchedulerStore,
425
    F: Fn() -> OperationId,
426
    I: InstantWrapper,
427
    NowFn: Fn() -> I,
428
{
429
    store: Arc<S>,
430
    now_fn: NowFn,
431
    operation_id_creator: F,
432
    _pull_task_change_subscriber_spawn: JoinHandleDropGuard<()>,
433
}
434
435
impl<S, F, I, NowFn> StoreAwaitedActionDb<S, F, I, NowFn>
436
where
437
    S: SchedulerStore,
438
    F: Fn() -> OperationId,
439
    I: InstantWrapper,
440
    NowFn: Fn() -> I + Send + Sync + Clone + 'static,
441
{
442
1
    pub fn new(
443
1
        store: Arc<S>,
444
1
        task_change_publisher: Arc<Notify>,
445
1
        now_fn: NowFn,
446
1
        operation_id_creator: F,
447
1
    ) -> Result<Self, Error> {
448
1
        let mut subscription = store
449
1
            .subscription_manager()
450
1
            .err_tip(|| 
"In RedisAwaitedActionDb::new"0
)
?0
451
1
            .subscribe(OperationIdToAwaitedAction(Cow::Owned(OperationId::String(
452
1
                String::new(),
453
1
            ))))
454
1
            .err_tip(|| 
"In RedisAwaitedActionDb::new"0
)
?0
;
455
1
        let pull_task_change_subscriber = spawn!(
456
            "redis_awaited_action_db_pull_task_change_subscriber",
457
1
            async move {
458
                loop {
459
5
                    let 
changed_res4
= subscription
460
5
                        .changed()
461
5
                        .await
462
4
                        .err_tip(|| 
"In RedisAwaitedActionDb::new"0
);
463
4
                    if let Err(
err0
) = changed_res {
  Branch (463:28): [True: 0, False: 0]
  Branch (463:28): [Folded - Ignored]
  Branch (463:28): [True: 0, False: 4]
464
0
                        event!(
465
0
                            Level::ERROR,
466
0
                            "Error waiting for pull task change subscriber in RedisAwaitedActionDb::new  - {err:?}"
467
                        );
468
                        // Sleep for a second to avoid a busy loop, then trigger the notify
469
                        // so if a reconnect happens we let local resources know that things
470
                        // might have changed.
471
0
                        tokio::time::sleep(Duration::from_secs(1)).await;
472
4
                    }
473
4
                    task_change_publisher.as_ref().notify_one();
474
                }
475
            }
476
        );
477
1
        Ok(Self {
478
1
            store,
479
1
            now_fn,
480
1
            operation_id_creator,
481
1
            _pull_task_change_subscriber_spawn: pull_task_change_subscriber,
482
1
        })
483
1
    }
484
485
1
    async fn try_subscribe(
486
1
        &self,
487
1
        client_operation_id: &ClientOperationId,
488
1
        unique_qualifier: &ActionUniqueQualifier,
489
1
        // TODO(allada) To simplify the scheduler 2024 refactor, we
490
1
        // removed the ability to upgrade priorities of actions.
491
1
        // we should add priority upgrades back in.
492
1
        _priority: i32,
493
1
    ) -> Result<Option<OperationSubscriber<S, I, NowFn>>, Error> {
494
1
        match unique_qualifier {
495
1
            ActionUniqueQualifier::Cachable(_) => {}
496
0
            ActionUniqueQualifier::Uncachable(_) => return Ok(None),
497
        }
498
1
        let stream = self
499
1
            .store
500
1
            .search_by_index_prefix(SearchUniqueQualifierToAwaitedAction(unique_qualifier))
501
1
            .await
502
1
            .err_tip(|| 
"In RedisAwaitedActionDb::try_subscribe"0
)
?0
;
503
1
        tokio::pin!(stream);
504
1
        let maybe_awaited_action = stream
505
1
            .try_next()
506
1
            .await
507
1
            .err_tip(|| 
"In RedisAwaitedActionDb::try_subscribe"0
)
?0
;
508
1
        match maybe_awaited_action {
509
0
            Some(mut awaited_action) => {
510
0
                // TODO(allada) We don't support joining completed jobs because we
511
0
                // need to also check that all the data is still in the cache.
512
0
                if awaited_action.state().stage.is_finished() {
  Branch (512:20): [True: 0, False: 0]
  Branch (512:20): [Folded - Ignored]
  Branch (512:20): [True: 0, False: 0]
513
0
                    return Ok(None);
514
0
                }
515
0
                // TODO(allada) We only care about the operation_id here, we should
516
0
                // have a way to tell the decoder we only care about specific fields.
517
0
                let operation_id = awaited_action.operation_id().clone();
518
0
519
0
                awaited_action.update_client_keep_alive((self.now_fn)().now());
520
0
                let update_res = inner_update_awaited_action(self.store.as_ref(), awaited_action)
521
0
                    .await
522
0
                    .err_tip(|| "In OperationSubscriber::changed");
523
0
                if let Err(err) = update_res {
  Branch (523:24): [True: 0, False: 0]
  Branch (523:24): [Folded - Ignored]
  Branch (523:24): [True: 0, False: 0]
524
0
                    event!(
525
0
                        Level::WARN,
526
0
                        "Error updating client keep alive in RedisAwaitedActionDb::try_subscribe - {err:?} - This is not a critical error, but we did decide to create a new action instead of joining an existing one."
527
                    );
528
0
                    return Ok(None);
529
0
                }
530
0
531
0
                Ok(Some(OperationSubscriber::new(
532
0
                    Some(client_operation_id.clone()),
533
0
                    OperationIdToAwaitedAction(Cow::Owned(operation_id)),
534
0
                    Arc::downgrade(&self.store),
535
0
                    self.now_fn.clone(),
536
0
                )))
537
            }
538
1
            None => Ok(None),
539
        }
540
1
    }
541
542
0
    async fn inner_get_awaited_action_by_id(
543
0
        &self,
544
0
        client_operation_id: &ClientOperationId,
545
0
    ) -> Result<Option<OperationSubscriber<S, I, NowFn>>, Error> {
546
0
        let maybe_operation_id = self
547
0
            .store
548
0
            .get_and_decode(ClientIdToOperationId(client_operation_id))
549
0
            .await
550
0
            .err_tip(|| "In RedisAwaitedActionDb::get_awaited_action_by_id")?;
551
0
        let Some(operation_id) = maybe_operation_id else {
  Branch (551:13): [True: 0, False: 0]
  Branch (551:13): [Folded - Ignored]
552
0
            return Ok(None);
553
        };
554
0
        Ok(Some(OperationSubscriber::new(
555
0
            Some(client_operation_id.clone()),
556
0
            OperationIdToAwaitedAction(Cow::Owned(operation_id)),
557
0
            Arc::downgrade(&self.store),
558
0
            self.now_fn.clone(),
559
0
        )))
560
0
    }
561
}
562
563
impl<S, F, I, NowFn> AwaitedActionDb for StoreAwaitedActionDb<S, F, I, NowFn>
564
where
565
    S: SchedulerStore,
566
    F: Fn() -> OperationId + Send + Sync + Unpin + 'static,
567
    I: InstantWrapper,
568
    NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static,
569
{
570
    type Subscriber = OperationSubscriber<S, I, NowFn>;
571
572
0
    async fn get_awaited_action_by_id(
573
0
        &self,
574
0
        client_operation_id: &ClientOperationId,
575
0
    ) -> Result<Option<Self::Subscriber>, Error> {
576
0
        self.inner_get_awaited_action_by_id(client_operation_id)
577
0
            .await
578
0
    }
579
580
0
    async fn get_by_operation_id(
581
0
        &self,
582
0
        operation_id: &OperationId,
583
0
    ) -> Result<Option<Self::Subscriber>, Error> {
584
0
        Ok(Some(OperationSubscriber::new(
585
0
            None,
586
0
            OperationIdToAwaitedAction(Cow::Owned(operation_id.clone())),
587
0
            Arc::downgrade(&self.store),
588
0
            self.now_fn.clone(),
589
0
        )))
590
0
    }
591
592
1
    async fn update_awaited_action(&self, new_awaited_action: AwaitedAction) -> Result<(), Error> {
593
1
        inner_update_awaited_action(self.store.as_ref(), new_awaited_action).await
594
1
    }
595
596
1
    async fn add_action(
597
1
        &self,
598
1
        client_operation_id: ClientOperationId,
599
1
        action_info: Arc<ActionInfo>,
600
1
    ) -> Result<Self::Subscriber, Error> {
601
        // Check to see if the action is already known and subscribe if it is.
602
1
        let subscription = self
603
1
            .try_subscribe(
604
1
                &client_operation_id,
605
1
                &action_info.unique_qualifier,
606
1
                action_info.priority,
607
1
            )
608
1
            .await
609
1
            .err_tip(|| 
"In RedisAwaitedActionDb::add_action"0
)
?0
;
610
1
        if let Some(
sub0
) = subscription {
  Branch (610:16): [True: 0, False: 0]
  Branch (610:16): [Folded - Ignored]
  Branch (610:16): [True: 0, False: 1]
611
0
            return Ok(sub);
612
1
        }
613
1
614
1
        let new_operation_id = (self.operation_id_creator)();
615
1
        let awaited_action =
616
1
            AwaitedAction::new(new_operation_id.clone(), action_info, (self.now_fn)().now());
617
1
        debug_assert!(
618
0
            ActionStage::Queued == awaited_action.state().stage,
619
0
            "Expected action to be queued"
620
        );
621
622
        // Note: Version is not needed with this api.
623
1
        let _version = self
624
1
            .store
625
1
            .update_data(UpdateOperationIdToAwaitedAction(awaited_action))
626
1
            .await
627
1
            .err_tip(|| 
"In RedisAwaitedActionDb::add_action"0
)
?0
628
1
            .err_tip(
629
0
                || "Version match failed for new action insert in RedisAwaitedActionDb::add_action",
630
0
            )?;
631
632
1
        self.store
633
1
            .update_data(UpdateClientIdToOperationId {
634
1
                client_operation_id: client_operation_id.clone(),
635
1
                operation_id: new_operation_id.clone(),
636
1
            })
637
1
            .await
638
1
            .err_tip(|| 
"In RedisAwaitedActionDb::add_action"0
)
?0
;
639
640
1
        Ok(OperationSubscriber::new(
641
1
            Some(client_operation_id),
642
1
            OperationIdToAwaitedAction(Cow::Owned(new_operation_id)),
643
1
            Arc::downgrade(&self.store),
644
1
            self.now_fn.clone(),
645
1
        ))
646
1
    }
647
648
0
    async fn get_range_of_actions(
649
0
        &self,
650
0
        state: SortedAwaitedActionState,
651
0
        start: Bound<SortedAwaitedAction>,
652
0
        end: Bound<SortedAwaitedAction>,
653
0
        desc: bool,
654
0
    ) -> Result<impl Stream<Item = Result<Self::Subscriber, Error>> + Send, Error> {
655
0
        if !matches!(start, Bound::Unbounded) {
  Branch (655:12): [True: 0, False: 0]
  Branch (655:12): [Folded - Ignored]
656
0
            return Err(make_err!(
657
0
                Code::Unimplemented,
658
0
                "Start bound is not supported in RedisAwaitedActionDb::get_range_of_actions",
659
0
            ));
660
0
        }
661
0
        if !matches!(end, Bound::Unbounded) {
  Branch (661:12): [True: 0, False: 0]
  Branch (661:12): [Folded - Ignored]
662
0
            return Err(make_err!(
663
0
                Code::Unimplemented,
664
0
                "Start bound is not supported in RedisAwaitedActionDb::get_range_of_actions",
665
0
            ));
666
0
        }
667
0
        // TODO(allada) This API is not difficult to implement, but there is no code path
668
0
        // that uses it, so no reason to implement it yet.
669
0
        if !desc {
  Branch (669:12): [True: 0, False: 0]
  Branch (669:12): [Folded - Ignored]
670
0
            return Err(make_err!(
671
0
                Code::Unimplemented,
672
0
                "Descending order is not supported in RedisAwaitedActionDb::get_range_of_actions",
673
0
            ));
674
0
        }
675
0
        Ok(self
676
0
            .store
677
0
            .search_by_index_prefix(SearchStateToAwaitedAction(get_state_prefix(state)))
678
0
            .await
679
0
            .err_tip(|| "In RedisAwaitedActionDb::get_range_of_actions")?
680
0
            .map_ok(move |awaited_action| {
681
0
                OperationSubscriber::new(
682
0
                    None,
683
0
                    OperationIdToAwaitedAction(Cow::Owned(awaited_action.operation_id().clone())),
684
0
                    Arc::downgrade(&self.store),
685
0
                    self.now_fn.clone(),
686
0
                )
687
0
            }))
688
0
    }
689
690
0
    async fn get_all_awaited_actions(
691
0
        &self,
692
0
    ) -> Result<impl Stream<Item = Result<Self::Subscriber, Error>>, Error> {
693
0
        Ok(self
694
0
            .store
695
0
            .search_by_index_prefix(SearchStateToAwaitedAction(""))
696
0
            .await
697
0
            .err_tip(|| "In RedisAwaitedActionDb::get_range_of_actions")?
698
0
            .map_ok(move |awaited_action| {
699
0
                OperationSubscriber::new(
700
0
                    None,
701
0
                    OperationIdToAwaitedAction(Cow::Owned(awaited_action.operation_id().clone())),
702
0
                    Arc::downgrade(&self.store),
703
0
                    self.now_fn.clone(),
704
0
                )
705
0
            }))
706
0
    }
707
}