Coverage Report

Created: 2025-03-08 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/store_awaited_action_db.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::ops::Bound;
17
use std::sync::atomic::{AtomicU64, Ordering};
18
use std::sync::{Arc, Weak};
19
use std::time::Duration;
20
21
use bytes::Bytes;
22
use futures::{Stream, TryStreamExt};
23
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
24
use nativelink_metric::MetricsComponent;
25
use nativelink_util::action_messages::{
26
    ActionInfo, ActionStage, ActionUniqueQualifier, OperationId,
27
};
28
use nativelink_util::instant_wrapper::InstantWrapper;
29
use nativelink_util::spawn;
30
use nativelink_util::store_trait::{
31
    FalseValue, SchedulerCurrentVersionProvider, SchedulerIndexProvider, SchedulerStore,
32
    SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,
33
    SchedulerSubscription, SchedulerSubscriptionManager, StoreKey, TrueValue,
34
};
35
use nativelink_util::task::JoinHandleDropGuard;
36
use tokio::sync::Notify;
37
use tracing::{event, Level};
38
39
use crate::awaited_action_db::{
40
    AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, SortedAwaitedAction,
41
    SortedAwaitedActionState, CLIENT_KEEPALIVE_DURATION,
42
};
43
44
type ClientOperationId = OperationId;
45
46
/// Maximum number of retries to update client keep alive.
47
const MAX_RETRIES_FOR_CLIENT_KEEPALIVE: u32 = 8;
48
49
enum OperationSubscriberState<Sub> {
50
    Unsubscribed,
51
    Subscribed(Sub),
52
}
53
54
pub struct OperationSubscriber<S: SchedulerStore, I: InstantWrapper, NowFn: Fn() -> I> {
55
    maybe_client_operation_id: Option<ClientOperationId>,
56
    subscription_key: OperationIdToAwaitedAction<'static>,
57
    weak_store: Weak<S>,
58
    state: OperationSubscriberState<
59
        <S::SubscriptionManager as SchedulerSubscriptionManager>::Subscription,
60
    >,
61
    last_known_keepalive_ts: AtomicU64,
62
    now_fn: NowFn,
63
}
64
impl<S, I, NowFn> OperationSubscriber<S, I, NowFn>
65
where
66
    S: SchedulerStore,
67
    I: InstantWrapper,
68
    NowFn: Fn() -> I,
69
{
70
1
    const fn new(
71
1
        maybe_client_operation_id: Option<ClientOperationId>,
72
1
        subscription_key: OperationIdToAwaitedAction<'static>,
73
1
        weak_store: Weak<S>,
74
1
        now_fn: NowFn,
75
1
    ) -> Self {
76
1
        Self {
77
1
            maybe_client_operation_id,
78
1
            subscription_key,
79
1
            weak_store,
80
1
            last_known_keepalive_ts: AtomicU64::new(0),
81
1
            state: OperationSubscriberState::Unsubscribed,
82
1
            now_fn,
83
1
        }
84
1
    }
85
86
2
    async fn inner_get_awaited_action(
87
2
        store: &S,
88
2
        key: OperationIdToAwaitedAction<'_>,
89
2
        maybe_client_operation_id: Option<ClientOperationId>,
90
2
        last_known_keepalive_ts: &AtomicU64,
91
2
    ) -> Result<AwaitedAction, Error> {
92
2
        let mut awaited_action = store
93
2
            .get_and_decode(key.borrow())
94
2
            .await
95
2
            .err_tip(|| 
format!("In OperationSubscriber::get_awaited_action {key:?}")0
)
?0
96
2
            .ok_or_else(|| {
97
0
                make_err!(
98
0
                    Code::NotFound,
99
0
                    "Could not find AwaitedAction for the given operation id {key:?}",
100
0
                )
101
2
            })
?0
;
102
2
        if let Some(client_operation_id) = maybe_client_operation_id {
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [Folded - Ignored]
  Branch (102:16): [True: 2, False: 0]
103
2
            awaited_action.set_client_operation_id(client_operation_id);
104
2
        
}0
105
2
        last_known_keepalive_ts.store(
106
2
            awaited_action
107
2
                .last_client_keepalive_timestamp()
108
2
                .unix_timestamp(),
109
2
            Ordering::Release,
110
2
        );
111
2
        Ok(awaited_action)
112
2
    }
113
114
0
    async fn get_awaited_action(&self) -> Result<AwaitedAction, Error> {
115
0
        let store = self
116
0
            .weak_store
117
0
            .upgrade()
118
0
            .err_tip(|| "Store gone in OperationSubscriber::get_awaited_action")?;
119
0
        Self::inner_get_awaited_action(
120
0
            store.as_ref(),
121
0
            self.subscription_key.borrow(),
122
0
            self.maybe_client_operation_id.clone(),
123
0
            &self.last_known_keepalive_ts,
124
0
        )
125
0
        .await
126
0
    }
127
}
128
129
impl<S, I, NowFn> AwaitedActionSubscriber for OperationSubscriber<S, I, NowFn>
130
where
131
    S: SchedulerStore,
132
    I: InstantWrapper,
133
    NowFn: Fn() -> I + Send + Sync + 'static,
134
{
135
2
    async fn changed(&mut self) -> Result<AwaitedAction, Error> {
136
2
        let store = self
137
2
            .weak_store
138
2
            .upgrade()
139
2
            .err_tip(|| 
"Store gone in OperationSubscriber::get_awaited_action"0
)
?0
;
140
2
        let subscription = match &mut self.state {
141
            OperationSubscriberState::Unsubscribed => {
142
1
                let subscription = store
143
1
                    .subscription_manager()
144
1
                    .err_tip(|| 
"In OperationSubscriber::changed::subscription_manager"0
)
?0
145
1
                    .subscribe(self.subscription_key.borrow())
146
1
                    .err_tip(|| 
"In OperationSubscriber::changed::subscribe"0
)
?0
;
147
1
                self.state = OperationSubscriberState::Subscribed(subscription);
148
1
                let OperationSubscriberState::Subscribed(subscription) = &mut self.state else {
  Branch (148:21): [True: 0, False: 0]
  Branch (148:21): [Folded - Ignored]
  Branch (148:21): [True: 1, False: 0]
149
0
                    unreachable!("Subscription should be in Subscribed state");
150
                };
151
1
                subscription
152
            }
153
1
            OperationSubscriberState::Subscribed(subscription) => subscription,
154
        };
155
156
2
        let changed_fut = subscription.changed();
157
2
        tokio::pin!(changed_fut);
158
        loop {
159
2
            let mut retries = 0;
160
            loop {
161
2
                let last_known_keepalive_ts = self.last_known_keepalive_ts.load(Ordering::Acquire);
162
2
                if I::from_secs(last_known_keepalive_ts).elapsed() <= CLIENT_KEEPALIVE_DURATION {
  Branch (162:20): [True: 0, False: 0]
  Branch (162:20): [Folded - Ignored]
  Branch (162:20): [True: 2, False: 0]
163
2
                    break; // We are still within the keep alive duration.
164
0
                }
165
0
                if retries > MAX_RETRIES_FOR_CLIENT_KEEPALIVE {
  Branch (165:20): [True: 0, False: 0]
  Branch (165:20): [Folded - Ignored]
  Branch (165:20): [True: 0, False: 0]
166
0
                    return Err(make_err!(
167
0
                        Code::Aborted,
168
0
                        "Could not update client keep alive for AwaitedAction",
169
0
                    ));
170
0
                }
171
0
                let mut awaited_action = Self::inner_get_awaited_action(
172
0
                    store.as_ref(),
173
0
                    self.subscription_key.borrow(),
174
0
                    self.maybe_client_operation_id.clone(),
175
0
                    &self.last_known_keepalive_ts,
176
0
                )
177
0
                .await
178
0
                .err_tip(|| "In OperationSubscriber::changed")?;
179
0
                awaited_action.update_client_keep_alive((self.now_fn)().now());
180
0
                let update_res = inner_update_awaited_action(store.as_ref(), awaited_action)
181
0
                    .await
182
0
                    .err_tip(|| "In OperationSubscriber::changed");
183
0
                if update_res.is_ok() {
  Branch (183:20): [True: 0, False: 0]
  Branch (183:20): [Folded - Ignored]
  Branch (183:20): [True: 0, False: 0]
184
0
                    break;
185
0
                }
186
0
                retries += 1;
187
0
                // Wait a tick before retrying.
188
0
                (self.now_fn)().sleep(Duration::from_millis(100)).await;
189
            }
190
2
            let sleep_fut = (self.now_fn)().sleep(CLIENT_KEEPALIVE_DURATION);
191
2
            tokio::select! {
192
2
                result = &mut changed_fut => {
193
2
                    result
?0
;
194
2
                    break;
195
                }
196
2
                () = sleep_fut => {
197
0
                    // If we haven't received any updates for a while, we should
198
0
                    // let the database know that we are still listening to prevent
199
0
                    // the action from being dropped.
200
0
                }
201
            }
202
        }
203
204
2
        Self::inner_get_awaited_action(
205
2
            store.as_ref(),
206
2
            self.subscription_key.borrow(),
207
2
            self.maybe_client_operation_id.clone(),
208
2
            &self.last_known_keepalive_ts,
209
2
        )
210
2
        .await
211
2
        .err_tip(|| 
"In OperationSubscriber::changed"0
)
212
2
    }
213
214
0
    async fn borrow(&self) -> Result<AwaitedAction, Error> {
215
0
        self.get_awaited_action()
216
0
            .await
217
0
            .err_tip(|| "In OperationSubscriber::borrow")
218
0
    }
219
}
220
221
2
fn awaited_action_decode(version: u64, data: &Bytes) -> Result<AwaitedAction, Error> {
222
2
    let mut awaited_action: AwaitedAction = serde_json::from_slice(data)
223
2
        .map_err(|e| 
make_input_err!("In AwaitedAction::decode - {e:?}")0
)
?0
;
224
2
    awaited_action.set_version(version);
225
2
    Ok(awaited_action)
226
2
}
227
228
const OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX: &str = "aa_";
229
const CLIENT_ID_TO_OPERATION_ID_KEY_PREFIX: &str = "cid_";
230
231
#[derive(Debug)]
232
struct OperationIdToAwaitedAction<'a>(Cow<'a, OperationId>);
233
impl OperationIdToAwaitedAction<'_> {
234
5
    fn borrow(&self) -> OperationIdToAwaitedAction<'_> {
235
5
        OperationIdToAwaitedAction(Cow::Borrowed(self.0.as_ref()))
236
5
    }
237
}
238
impl SchedulerStoreKeyProvider for OperationIdToAwaitedAction<'_> {
239
    type Versioned = TrueValue;
240
6
    fn get_key(&self) -> StoreKey<'static> {
241
6
        StoreKey::Str(Cow::Owned(format!(
242
6
            "{OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX}{}",
243
6
            self.0
244
6
        )))
245
6
    }
246
}
247
impl SchedulerStoreDecodeTo for OperationIdToAwaitedAction<'_> {
248
    type DecodeOutput = AwaitedAction;
249
2
    fn decode(version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error> {
250
2
        awaited_action_decode(version, &data)
251
2
    }
252
}
253
254
struct ClientIdToOperationId<'a>(&'a OperationId);
255
impl SchedulerStoreKeyProvider for ClientIdToOperationId<'_> {
256
    type Versioned = FalseValue;
257
1
    fn get_key(&self) -> StoreKey<'static> {
258
1
        StoreKey::Str(Cow::Owned(format!(
259
1
            "{CLIENT_ID_TO_OPERATION_ID_KEY_PREFIX}{}",
260
1
            self.0
261
1
        )))
262
1
    }
263
}
264
impl SchedulerStoreDecodeTo for ClientIdToOperationId<'_> {
265
    type DecodeOutput = OperationId;
266
0
    fn decode(_version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error> {
267
0
        OperationId::try_from(data).err_tip(|| "In ClientIdToOperationId::decode")
268
0
    }
269
}
270
271
// TODO(allada) We only need operation_id here, it would be nice if we had a way
272
// to tell the decoder we only care about specific fields.
273
struct SearchUniqueQualifierToAwaitedAction<'a>(&'a ActionUniqueQualifier);
274
impl SchedulerIndexProvider for SearchUniqueQualifierToAwaitedAction<'_> {
275
    const KEY_PREFIX: &'static str = OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX;
276
    const INDEX_NAME: &'static str = "unique_qualifier";
277
    type Versioned = TrueValue;
278
1
    fn index_value(&self) -> Cow<'_, str> {
279
1
        Cow::Owned(format!("{}", self.0))
280
1
    }
281
}
282
impl SchedulerStoreDecodeTo for SearchUniqueQualifierToAwaitedAction<'_> {
283
    type DecodeOutput = AwaitedAction;
284
0
    fn decode(version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error> {
285
0
        awaited_action_decode(version, &data)
286
0
    }
287
}
288
289
struct SearchStateToAwaitedAction(&'static str);
290
impl SchedulerIndexProvider for SearchStateToAwaitedAction {
291
    const KEY_PREFIX: &'static str = OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX;
292
    const INDEX_NAME: &'static str = "state";
293
    const MAYBE_SORT_KEY: Option<&'static str> = Some("sort_key");
294
    type Versioned = TrueValue;
295
0
    fn index_value(&self) -> Cow<'_, str> {
296
0
        Cow::Borrowed(self.0)
297
0
    }
298
}
299
impl SchedulerStoreDecodeTo for SearchStateToAwaitedAction {
300
    type DecodeOutput = AwaitedAction;
301
0
    fn decode(version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error> {
302
0
        awaited_action_decode(version, &data)
303
0
    }
304
}
305
306
2
const fn get_state_prefix(state: SortedAwaitedActionState) -> &'static str {
307
2
    match state {
308
0
        SortedAwaitedActionState::CacheCheck => "cache_check",
309
1
        SortedAwaitedActionState::Queued => "queued",
310
1
        SortedAwaitedActionState::Executing => "executing",
311
0
        SortedAwaitedActionState::Completed => "completed",
312
    }
313
2
}
314
315
struct UpdateOperationIdToAwaitedAction(AwaitedAction);
316
impl SchedulerCurrentVersionProvider for UpdateOperationIdToAwaitedAction {
317
2
    fn current_version(&self) -> u64 {
318
2
        self.0.version()
319
2
    }
320
}
321
impl SchedulerStoreKeyProvider for UpdateOperationIdToAwaitedAction {
322
    type Versioned = TrueValue;
323
2
    fn get_key(&self) -> StoreKey<'static> {
324
2
        OperationIdToAwaitedAction(Cow::Borrowed(self.0.operation_id())).get_key()
325
2
    }
326
}
327
impl SchedulerStoreDataProvider for UpdateOperationIdToAwaitedAction {
328
2
    fn try_into_bytes(self) -> Result<Bytes, Error> {
329
2
        serde_json::to_string(&self.0)
330
2
            .map(Bytes::from)
331
2
            .map_err(|e| 
make_input_err!("Could not convert AwaitedAction to json - {e:?}")0
)
332
2
    }
333
2
    fn get_indexes(&self) -> Result<Vec<(&'static str, Bytes)>, Error> {
334
2
        let unique_qualifier = &self.0.action_info().unique_qualifier;
335
2
        let maybe_unique_qualifier = match &unique_qualifier {
336
2
            ActionUniqueQualifier::Cachable(_) => Some(unique_qualifier),
337
0
            ActionUniqueQualifier::Uncachable(_) => None,
338
        };
339
2
        let mut output = Vec::with_capacity(2 + maybe_unique_qualifier.map_or(0, |_| 1));
340
2
        if maybe_unique_qualifier.is_some() {
  Branch (340:12): [True: 2, False: 0]
  Branch (340:12): [Folded - Ignored]
341
2
            output.push((
342
2
                "unique_qualifier",
343
2
                Bytes::from(unique_qualifier.to_string()),
344
2
            ));
345
2
        
}0
346
        {
347
2
            let state = SortedAwaitedActionState::try_from(&self.0.state().stage)
348
2
                .err_tip(|| 
"In UpdateOperationIdToAwaitedAction::get_index"0
)
?0
;
349
2
            output.push(("state", Bytes::from(get_state_prefix(state))));
350
2
            let sorted_awaited_action = SortedAwaitedAction::from(&self.0);
351
2
            output.push((
352
2
                "sort_key",
353
2
                // We encode to hex to ensure that the sort key is lexicographically sorted.
354
2
                Bytes::from(format!("{:016x}", sorted_awaited_action.sort_key.as_u64())),
355
2
            ));
356
2
        }
357
2
        Ok(output)
358
2
    }
359
}
360
361
struct UpdateClientIdToOperationId {
362
    client_operation_id: ClientOperationId,
363
    operation_id: OperationId,
364
}
365
impl SchedulerStoreKeyProvider for UpdateClientIdToOperationId {
366
    type Versioned = FalseValue;
367
1
    fn get_key(&self) -> StoreKey<'static> {
368
1
        ClientIdToOperationId(&self.client_operation_id).get_key()
369
1
    }
370
}
371
impl SchedulerStoreDataProvider for UpdateClientIdToOperationId {
372
1
    fn try_into_bytes(self) -> Result<Bytes, Error> {
373
1
        serde_json::to_string(&self.operation_id)
374
1
            .map(Bytes::from)
375
1
            .map_err(|e| 
make_input_err!("Could not convert OperationId to json - {e:?}")0
)
376
1
    }
377
}
378
379
1
async fn inner_update_awaited_action(
380
1
    store: &impl SchedulerStore,
381
1
    mut new_awaited_action: AwaitedAction,
382
1
) -> Result<(), Error> {
383
1
    let operation_id = new_awaited_action.operation_id().clone();
384
1
    if new_awaited_action.state().client_operation_id != operation_id {
  Branch (384:8): [True: 0, False: 0]
  Branch (384:8): [Folded - Ignored]
  Branch (384:8): [True: 0, False: 1]
385
0
        // Just in case the client_operation_id was set to something else
386
0
        // we put it back to the underlying operation_id.
387
0
        new_awaited_action.set_client_operation_id(operation_id.clone());
388
1
    }
389
1
    let maybe_version = store
390
1
        .update_data(UpdateOperationIdToAwaitedAction(new_awaited_action))
391
1
        .await
392
1
        .err_tip(|| 
"In RedisAwaitedActionDb::update_awaited_action"0
)
?0
;
393
1
    if maybe_version.is_none() {
  Branch (393:8): [True: 0, False: 0]
  Branch (393:8): [Folded - Ignored]
  Branch (393:8): [True: 0, False: 1]
394
0
        return Err(make_err!(
395
0
            Code::Aborted,
396
0
            "Could not update AwaitedAction because the version did not match for {operation_id:?}",
397
0
        ));
398
1
    }
399
1
    Ok(())
400
1
}
401
402
0
#[derive(MetricsComponent)]
403
pub struct StoreAwaitedActionDb<S, F, I, NowFn>
404
where
405
    S: SchedulerStore,
406
    F: Fn() -> OperationId,
407
    I: InstantWrapper,
408
    NowFn: Fn() -> I,
409
{
410
    store: Arc<S>,
411
    now_fn: NowFn,
412
    operation_id_creator: F,
413
    _pull_task_change_subscriber_spawn: JoinHandleDropGuard<()>,
414
}
415
416
impl<S, F, I, NowFn> StoreAwaitedActionDb<S, F, I, NowFn>
417
where
418
    S: SchedulerStore,
419
    F: Fn() -> OperationId,
420
    I: InstantWrapper,
421
    NowFn: Fn() -> I + Send + Sync + Clone + 'static,
422
{
423
1
    pub fn new(
424
1
        store: Arc<S>,
425
1
        task_change_publisher: Arc<Notify>,
426
1
        now_fn: NowFn,
427
1
        operation_id_creator: F,
428
1
    ) -> Result<Self, Error> {
429
1
        let mut subscription = store
430
1
            .subscription_manager()
431
1
            .err_tip(|| 
"In RedisAwaitedActionDb::new"0
)
?0
432
1
            .subscribe(OperationIdToAwaitedAction(Cow::Owned(OperationId::String(
433
1
                String::new(),
434
1
            ))))
435
1
            .err_tip(|| 
"In RedisAwaitedActionDb::new"0
)
?0
;
436
1
        let pull_task_change_subscriber = spawn!(
437
1
            "redis_awaited_action_db_pull_task_change_subscriber",
438
1
            async move {
439
                loop {
440
5
                    let 
changed_res4
= subscription
441
5
                        .changed()
442
5
                        .await
443
4
                        .err_tip(|| 
"In RedisAwaitedActionDb::new"0
);
444
4
                    if let Err(
err0
) = changed_res {
  Branch (444:28): [True: 0, False: 0]
  Branch (444:28): [Folded - Ignored]
  Branch (444:28): [True: 0, False: 4]
445
0
                        event!(
446
0
                            Level::ERROR,
447
0
                            "Error waiting for pull task change subscriber in RedisAwaitedActionDb::new  - {err:?}"
448
                        );
449
                        // Sleep for a second to avoid a busy loop, then trigger the notify
450
                        // so if a reconnect happens we let local resources know that things
451
                        // might have changed.
452
0
                        tokio::time::sleep(Duration::from_secs(1)).await;
453
4
                    }
454
4
                    task_change_publisher.as_ref().notify_one();
455
                }
456
1
            }
457
1
        );
458
1
        Ok(Self {
459
1
            store,
460
1
            now_fn,
461
1
            operation_id_creator,
462
1
            _pull_task_change_subscriber_spawn: pull_task_change_subscriber,
463
1
        })
464
1
    }
465
466
1
    async fn try_subscribe(
467
1
        &self,
468
1
        client_operation_id: &ClientOperationId,
469
1
        unique_qualifier: &ActionUniqueQualifier,
470
1
        // TODO(allada) To simplify the scheduler 2024 refactor, we
471
1
        // removed the ability to upgrade priorities of actions.
472
1
        // we should add priority upgrades back in.
473
1
        _priority: i32,
474
1
    ) -> Result<Option<OperationSubscriber<S, I, NowFn>>, Error> {
475
1
        match unique_qualifier {
476
1
            ActionUniqueQualifier::Cachable(_) => {}
477
0
            ActionUniqueQualifier::Uncachable(_) => return Ok(None),
478
        }
479
1
        let stream = self
480
1
            .store
481
1
            .search_by_index_prefix(SearchUniqueQualifierToAwaitedAction(unique_qualifier))
482
1
            .await
483
1
            .err_tip(|| 
"In RedisAwaitedActionDb::try_subscribe"0
)
?0
;
484
1
        tokio::pin!(stream);
485
1
        let maybe_awaited_action = stream
486
1
            .try_next()
487
1
            .await
488
1
            .err_tip(|| 
"In RedisAwaitedActionDb::try_subscribe"0
)
?0
;
489
1
        match maybe_awaited_action {
490
0
            Some(mut awaited_action) => {
491
0
                // TODO(allada) We don't support joining completed jobs because we
492
0
                // need to also check that all the data is still in the cache.
493
0
                if awaited_action.state().stage.is_finished() {
  Branch (493:20): [True: 0, False: 0]
  Branch (493:20): [Folded - Ignored]
  Branch (493:20): [True: 0, False: 0]
494
0
                    return Ok(None);
495
0
                }
496
0
                // TODO(allada) We only care about the operation_id here, we should
497
0
                // have a way to tell the decoder we only care about specific fields.
498
0
                let operation_id = awaited_action.operation_id().clone();
499
0
500
0
                awaited_action.update_client_keep_alive((self.now_fn)().now());
501
0
                let update_res = inner_update_awaited_action(self.store.as_ref(), awaited_action)
502
0
                    .await
503
0
                    .err_tip(|| "In OperationSubscriber::changed");
504
0
                if let Err(err) = update_res {
  Branch (504:24): [True: 0, False: 0]
  Branch (504:24): [Folded - Ignored]
  Branch (504:24): [True: 0, False: 0]
505
0
                    event!(
506
0
                        Level::WARN,
507
0
                        "Error updating client keep alive in RedisAwaitedActionDb::try_subscribe - {err:?} - This is not a critical error, but we did decide to create a new action instead of joining an existing one."
508
                    );
509
0
                    return Ok(None);
510
0
                }
511
0
512
0
                Ok(Some(OperationSubscriber::new(
513
0
                    Some(client_operation_id.clone()),
514
0
                    OperationIdToAwaitedAction(Cow::Owned(operation_id)),
515
0
                    Arc::downgrade(&self.store),
516
0
                    self.now_fn.clone(),
517
0
                )))
518
            }
519
1
            None => Ok(None),
520
        }
521
1
    }
522
523
0
    async fn inner_get_awaited_action_by_id(
524
0
        &self,
525
0
        client_operation_id: &ClientOperationId,
526
0
    ) -> Result<Option<OperationSubscriber<S, I, NowFn>>, Error> {
527
0
        let maybe_operation_id = self
528
0
            .store
529
0
            .get_and_decode(ClientIdToOperationId(client_operation_id))
530
0
            .await
531
0
            .err_tip(|| "In RedisAwaitedActionDb::get_awaited_action_by_id")?;
532
0
        let Some(operation_id) = maybe_operation_id else {
  Branch (532:13): [True: 0, False: 0]
  Branch (532:13): [Folded - Ignored]
533
0
            return Ok(None);
534
        };
535
0
        Ok(Some(OperationSubscriber::new(
536
0
            Some(client_operation_id.clone()),
537
0
            OperationIdToAwaitedAction(Cow::Owned(operation_id)),
538
0
            Arc::downgrade(&self.store),
539
0
            self.now_fn.clone(),
540
0
        )))
541
0
    }
542
}
543
544
impl<S, F, I, NowFn> AwaitedActionDb for StoreAwaitedActionDb<S, F, I, NowFn>
545
where
546
    S: SchedulerStore,
547
    F: Fn() -> OperationId + Send + Sync + Unpin + 'static,
548
    I: InstantWrapper,
549
    NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static,
550
{
551
    type Subscriber = OperationSubscriber<S, I, NowFn>;
552
553
0
    async fn get_awaited_action_by_id(
554
0
        &self,
555
0
        client_operation_id: &ClientOperationId,
556
0
    ) -> Result<Option<Self::Subscriber>, Error> {
557
0
        self.inner_get_awaited_action_by_id(client_operation_id)
558
0
            .await
559
0
    }
560
561
0
    async fn get_by_operation_id(
562
0
        &self,
563
0
        operation_id: &OperationId,
564
0
    ) -> Result<Option<Self::Subscriber>, Error> {
565
0
        Ok(Some(OperationSubscriber::new(
566
0
            None,
567
0
            OperationIdToAwaitedAction(Cow::Owned(operation_id.clone())),
568
0
            Arc::downgrade(&self.store),
569
0
            self.now_fn.clone(),
570
0
        )))
571
0
    }
572
573
1
    async fn update_awaited_action(&self, new_awaited_action: AwaitedAction) -> Result<(), Error> {
574
1
        inner_update_awaited_action(self.store.as_ref(), new_awaited_action).await
575
1
    }
576
577
1
    async fn add_action(
578
1
        &self,
579
1
        client_operation_id: ClientOperationId,
580
1
        action_info: Arc<ActionInfo>,
581
1
    ) -> Result<Self::Subscriber, Error> {
582
        // Check to see if the action is already known and subscribe if it is.
583
1
        let subscription = self
584
1
            .try_subscribe(
585
1
                &client_operation_id,
586
1
                &action_info.unique_qualifier,
587
1
                action_info.priority,
588
1
            )
589
1
            .await
590
1
            .err_tip(|| 
"In RedisAwaitedActionDb::add_action"0
)
?0
;
591
1
        if let Some(
sub0
) = subscription {
  Branch (591:16): [True: 0, False: 0]
  Branch (591:16): [Folded - Ignored]
  Branch (591:16): [True: 0, False: 1]
592
0
            return Ok(sub);
593
1
        }
594
1
595
1
        let new_operation_id = (self.operation_id_creator)();
596
1
        let awaited_action =
597
1
            AwaitedAction::new(new_operation_id.clone(), action_info, (self.now_fn)().now());
598
1
        debug_assert!(
599
0
            ActionStage::Queued == awaited_action.state().stage,
600
0
            "Expected action to be queued"
601
        );
602
603
        // Note: Version is not needed with this api.
604
1
        let _version = self
605
1
            .store
606
1
            .update_data(UpdateOperationIdToAwaitedAction(awaited_action))
607
1
            .await
608
1
            .err_tip(|| 
"In RedisAwaitedActionDb::add_action"0
)
?0
609
1
            .err_tip(|| {
610
0
                "Version match failed for new action insert in RedisAwaitedActionDb::add_action"
611
1
            })
?0
;
612
613
1
        self.store
614
1
            .update_data(UpdateClientIdToOperationId {
615
1
                client_operation_id: client_operation_id.clone(),
616
1
                operation_id: new_operation_id.clone(),
617
1
            })
618
1
            .await
619
1
            .err_tip(|| 
"In RedisAwaitedActionDb::add_action"0
)
?0
;
620
621
1
        Ok(OperationSubscriber::new(
622
1
            Some(client_operation_id),
623
1
            OperationIdToAwaitedAction(Cow::Owned(new_operation_id)),
624
1
            Arc::downgrade(&self.store),
625
1
            self.now_fn.clone(),
626
1
        ))
627
1
    }
628
629
0
    async fn get_range_of_actions(
630
0
        &self,
631
0
        state: SortedAwaitedActionState,
632
0
        start: Bound<SortedAwaitedAction>,
633
0
        end: Bound<SortedAwaitedAction>,
634
0
        desc: bool,
635
0
    ) -> Result<impl Stream<Item = Result<Self::Subscriber, Error>> + Send, Error> {
636
0
        if !matches!(start, Bound::Unbounded) {
  Branch (636:12): [True: 0, False: 0]
  Branch (636:12): [Folded - Ignored]
637
0
            return Err(make_err!(
638
0
                Code::Unimplemented,
639
0
                "Start bound is not supported in RedisAwaitedActionDb::get_range_of_actions",
640
0
            ));
641
0
        }
642
0
        if !matches!(end, Bound::Unbounded) {
  Branch (642:12): [True: 0, False: 0]
  Branch (642:12): [Folded - Ignored]
643
0
            return Err(make_err!(
644
0
                Code::Unimplemented,
645
0
                "Start bound is not supported in RedisAwaitedActionDb::get_range_of_actions",
646
0
            ));
647
0
        }
648
0
        // TODO(allada) This API is not difficult to implement, but there is no code path
649
0
        // that uses it, so no reason to implement it yet.
650
0
        if !desc {
  Branch (650:12): [True: 0, False: 0]
  Branch (650:12): [Folded - Ignored]
651
0
            return Err(make_err!(
652
0
                Code::Unimplemented,
653
0
                "Descending order is not supported in RedisAwaitedActionDb::get_range_of_actions",
654
0
            ));
655
0
        }
656
0
        Ok(self
657
0
            .store
658
0
            .search_by_index_prefix(SearchStateToAwaitedAction(get_state_prefix(state)))
659
0
            .await
660
0
            .err_tip(|| "In RedisAwaitedActionDb::get_range_of_actions")?
661
0
            .map_ok(move |awaited_action| {
662
0
                OperationSubscriber::new(
663
0
                    None,
664
0
                    OperationIdToAwaitedAction(Cow::Owned(awaited_action.operation_id().clone())),
665
0
                    Arc::downgrade(&self.store),
666
0
                    self.now_fn.clone(),
667
0
                )
668
0
            }))
669
0
    }
670
671
0
    async fn get_all_awaited_actions(
672
0
        &self,
673
0
    ) -> Result<impl Stream<Item = Result<Self::Subscriber, Error>>, Error> {
674
0
        Ok(self
675
0
            .store
676
0
            .search_by_index_prefix(SearchStateToAwaitedAction(""))
677
0
            .await
678
0
            .err_tip(|| "In RedisAwaitedActionDb::get_range_of_actions")?
679
0
            .map_ok(move |awaited_action| {
680
0
                OperationSubscriber::new(
681
0
                    None,
682
0
                    OperationIdToAwaitedAction(Cow::Owned(awaited_action.operation_id().clone())),
683
0
                    Arc::downgrade(&self.store),
684
0
                    self.now_fn.clone(),
685
0
                )
686
0
            }))
687
0
    }
688
}