Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-scheduler/src/store_awaited_action_db.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::ops::Bound;
17
use std::sync::{Arc, Weak};
18
use std::time::{Duration, SystemTime};
19
20
use bytes::Bytes;
21
use futures::{Stream, TryStreamExt};
22
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
23
use nativelink_metric::MetricsComponent;
24
use nativelink_util::action_messages::{
25
    ActionInfo, ActionStage, ActionUniqueQualifier, OperationId,
26
};
27
use nativelink_util::spawn;
28
use nativelink_util::store_trait::{
29
    FalseValue, SchedulerCurrentVersionProvider, SchedulerIndexProvider, SchedulerStore,
30
    SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,
31
    SchedulerSubscription, SchedulerSubscriptionManager, StoreKey, TrueValue,
32
};
33
use nativelink_util::task::JoinHandleDropGuard;
34
use tokio::sync::Notify;
35
use tracing::{event, Level};
36
37
use crate::awaited_action_db::{
38
    AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, SortedAwaitedAction,
39
    SortedAwaitedActionState,
40
};
41
42
type ClientOperationId = OperationId;
43
44
enum OperationSubscriberState<Sub> {
45
    Unsubscribed,
46
    Subscribed(Sub),
47
}
48
49
pub struct OperationSubscriber<S: SchedulerStore> {
50
    maybe_client_operation_id: Option<ClientOperationId>,
51
    subscription_key: OperationIdToAwaitedAction<'static>,
52
    weak_store: Weak<S>,
53
    state: OperationSubscriberState<
54
        <S::SubscriptionManager as SchedulerSubscriptionManager>::Subscription,
55
    >,
56
    now_fn: fn() -> SystemTime,
57
}
58
impl<S: SchedulerStore> OperationSubscriber<S> {
59
1
    fn new(
60
1
        maybe_client_operation_id: Option<ClientOperationId>,
61
1
        subscription_key: OperationIdToAwaitedAction<'static>,
62
1
        weak_store: Weak<S>,
63
1
        now_fn: fn() -> SystemTime,
64
1
    ) -> Self {
65
1
        Self {
66
1
            maybe_client_operation_id,
67
1
            subscription_key,
68
1
            weak_store,
69
1
            state: OperationSubscriberState::Unsubscribed,
70
1
            now_fn,
71
1
        }
72
1
    }
73
74
2
    async fn get_awaited_action(&self) -> Result<AwaitedAction, Error> {
75
2
        let store = self
76
2
            .weak_store
77
2
            .upgrade()
78
2
            .err_tip(|| 
"Store gone in OperationSubscriber::get_awaited_action"0
)
?0
;
79
2
        let key = self.subscription_key.borrow();
80
2
        let mut awaited_action = store
81
2
            .get_and_decode(key.borrow())
82
2
            .await
83
2
            .err_tip(|| 
format!("In OperationSubscriber::get_awaited_action {key:?}")0
)
?0
84
2
            .ok_or_else(|| {
85
0
                make_err!(
86
0
                    Code::NotFound,
87
0
                    "Could not find AwaitedAction for the given operation id {key:?}",
88
0
                )
89
2
            })
?0
;
90
2
        if let Some(client_operation_id) = &self.maybe_client_operation_id {
  Branch (90:16): [True: 0, False: 0]
  Branch (90:16): [Folded - Ignored]
  Branch (90:16): [True: 2, False: 0]
91
2
            let mut state = awaited_action.state().as_ref().clone();
92
2
            state.client_operation_id = client_operation_id.clone();
93
2
            awaited_action.set_state(Arc::new(state), Some((self.now_fn)()));
94
2
        }
0
95
2
        Ok(awaited_action)
96
2
    }
97
}
98
99
impl<S: SchedulerStore> AwaitedActionSubscriber for OperationSubscriber<S> {
100
2
    async fn changed(&mut self) -> Result<AwaitedAction, Error> {
101
2
        let store = self
102
2
            .weak_store
103
2
            .upgrade()
104
2
            .err_tip(|| 
"Store gone in OperationSubscriber::get_awaited_action"0
)
?0
;
105
2
        let subscription = match &mut self.state {
106
            OperationSubscriberState::Unsubscribed => {
107
1
                let subscription = store
108
1
                    .subscription_manager()
109
1
                    .err_tip(|| 
"In OperationSubscriber::changed::subscription_manager"0
)
?0
110
1
                    .subscribe(self.subscription_key.borrow())
111
1
                    .err_tip(|| 
"In OperationSubscriber::changed::subscribe"0
)
?0
;
112
1
                self.state = OperationSubscriberState::Subscribed(subscription);
113
1
                let OperationSubscriberState::Subscribed(subscription) = &mut self.state else {
  Branch (113:21): [True: 0, False: 0]
  Branch (113:21): [Folded - Ignored]
  Branch (113:21): [True: 1, False: 0]
114
0
                    unreachable!("Subscription should be in Subscribed state");
115
                };
116
1
                subscription
117
            }
118
1
            OperationSubscriberState::Subscribed(subscription) => subscription,
119
        };
120
2
        subscription
121
2
            .changed()
122
2
            .await
123
2
            .err_tip(|| 
"In OperationSubscriber::changed"0
)
?0
;
124
2
        self.get_awaited_action()
125
2
            .await
126
2
            .err_tip(|| 
"In OperationSubscriber::changed"0
)
127
2
    }
128
129
0
    async fn borrow(&self) -> Result<AwaitedAction, Error> {
130
0
        self.get_awaited_action()
131
0
            .await
132
0
            .err_tip(|| "In OperationSubscriber::borrow")
133
0
    }
134
}
135
136
2
fn awaited_action_decode(version: u64, data: &Bytes) -> Result<AwaitedAction, Error> {
137
2
    let mut awaited_action: AwaitedAction = serde_json::from_slice(data)
138
2
        .map_err(|e| 
make_input_err!("In AwaitedAction::decode - {e:?}")0
)
?0
;
139
2
    awaited_action.set_version(version);
140
2
    Ok(awaited_action)
141
2
}
142
143
const OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX: &str = "aa_";
144
const CLIENT_ID_TO_OPERATION_ID_KEY_PREFIX: &str = "cid_";
145
146
#[derive(Debug)]
147
struct OperationIdToAwaitedAction<'a>(Cow<'a, OperationId>);
148
impl OperationIdToAwaitedAction<'_> {
149
5
    fn borrow(&self) -> OperationIdToAwaitedAction<'_> {
150
5
        match self.0 {
151
2
            Cow::Borrowed(operation_id) => OperationIdToAwaitedAction(Cow::Borrowed(operation_id)),
152
3
            Cow::Owned(ref operation_id) => OperationIdToAwaitedAction(Cow::Borrowed(operation_id)),
153
        }
154
5
    }
155
}
156
impl SchedulerStoreKeyProvider for OperationIdToAwaitedAction<'_> {
157
    type Versioned = TrueValue;
158
6
    fn get_key(&self) -> StoreKey<'static> {
159
6
        StoreKey::Str(Cow::Owned(format!(
160
6
            "{OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX}{}",
161
6
            self.0
162
6
        )))
163
6
    }
164
}
165
impl SchedulerStoreDecodeTo for OperationIdToAwaitedAction<'_> {
166
    type DecodeOutput = AwaitedAction;
167
2
    fn decode(version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error> {
168
2
        awaited_action_decode(version, &data)
169
2
    }
170
}
171
172
struct ClientIdToOperationId<'a>(&'a OperationId);
173
impl SchedulerStoreKeyProvider for ClientIdToOperationId<'_> {
174
    type Versioned = FalseValue;
175
1
    fn get_key(&self) -> StoreKey<'static> {
176
1
        StoreKey::Str(Cow::Owned(format!(
177
1
            "{CLIENT_ID_TO_OPERATION_ID_KEY_PREFIX}{}",
178
1
            self.0
179
1
        )))
180
1
    }
181
}
182
impl SchedulerStoreDecodeTo for ClientIdToOperationId<'_> {
183
    type DecodeOutput = OperationId;
184
0
    fn decode(_version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error> {
185
0
        OperationId::try_from(data).err_tip(|| "In ClientIdToOperationId::decode")
186
0
    }
187
}
188
189
// TODO(allada) We only need operation_id here, it would be nice if we had a way
190
// to tell the decoder we only care about specific fields.
191
struct SearchUniqueQualifierToAwaitedAction<'a>(&'a ActionUniqueQualifier);
192
impl SchedulerIndexProvider for SearchUniqueQualifierToAwaitedAction<'_> {
193
    const KEY_PREFIX: &'static str = OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX;
194
    const INDEX_NAME: &'static str = "unique_qualifier";
195
    type Versioned = TrueValue;
196
1
    fn index_value_prefix(&self) -> Cow<'_, str> {
197
1
        Cow::Owned(format!("{}", self.0))
198
1
    }
199
}
200
impl SchedulerStoreDecodeTo for SearchUniqueQualifierToAwaitedAction<'_> {
201
    type DecodeOutput = AwaitedAction;
202
0
    fn decode(version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error> {
203
0
        awaited_action_decode(version, &data)
204
0
    }
205
}
206
207
struct SearchSortKeyPrefixToAwaitedAction(&'static str);
208
impl SchedulerIndexProvider for SearchSortKeyPrefixToAwaitedAction {
209
    const KEY_PREFIX: &'static str = OPERATION_ID_TO_AWAITED_ACTION_KEY_PREFIX;
210
    const INDEX_NAME: &'static str = "sort_key";
211
    type Versioned = TrueValue;
212
0
    fn index_value_prefix(&self) -> Cow<'_, str> {
213
0
        Cow::Borrowed(self.0)
214
0
    }
215
}
216
impl SchedulerStoreDecodeTo for SearchSortKeyPrefixToAwaitedAction {
217
    type DecodeOutput = AwaitedAction;
218
0
    fn decode(version: u64, data: Bytes) -> Result<Self::DecodeOutput, Error> {
219
0
        awaited_action_decode(version, &data)
220
0
    }
221
}
222
223
2
fn get_state_prefix(state: SortedAwaitedActionState) -> &'static str {
224
2
    match state {
225
0
        SortedAwaitedActionState::CacheCheck => "x_",
226
1
        SortedAwaitedActionState::Queued => "q_",
227
1
        SortedAwaitedActionState::Executing => "e_",
228
0
        SortedAwaitedActionState::Completed => "c_",
229
    }
230
2
}
231
232
struct UpdateOperationIdToAwaitedAction(AwaitedAction);
233
impl SchedulerCurrentVersionProvider for UpdateOperationIdToAwaitedAction {
234
2
    fn current_version(&self) -> u64 {
235
2
        self.0.version()
236
2
    }
237
}
238
impl SchedulerStoreKeyProvider for UpdateOperationIdToAwaitedAction {
239
    type Versioned = TrueValue;
240
2
    fn get_key(&self) -> StoreKey<'static> {
241
2
        OperationIdToAwaitedAction(Cow::Borrowed(self.0.operation_id())).get_key()
242
2
    }
243
}
244
impl SchedulerStoreDataProvider for UpdateOperationIdToAwaitedAction {
245
2
    fn try_into_bytes(self) -> Result<Bytes, Error> {
246
2
        serde_json::to_string(&self.0)
247
2
            .map(Bytes::from)
248
2
            .map_err(|e| 
make_input_err!("Could not convert AwaitedAction to json - {e:?}")0
)
249
2
    }
250
2
    fn get_indexes(&self) -> Result<Vec<(&'static str, Bytes)>, Error> {
251
2
        let unique_qualifier = &self.0.action_info().unique_qualifier;
252
2
        let maybe_unique_qualifier = match &unique_qualifier {
253
2
            ActionUniqueQualifier::Cachable(_) => Some(unique_qualifier),
254
0
            ActionUniqueQualifier::Uncachable(_) => None,
255
        };
256
2
        let mut output = Vec::with_capacity(1 + maybe_unique_qualifier.map_or(0, |_| 1));
257
2
        if maybe_unique_qualifier.is_some() {
  Branch (257:12): [True: 2, False: 0]
  Branch (257:12): [Folded - Ignored]
258
2
            output.push((
259
2
                "unique_qualifier",
260
2
                Bytes::from(unique_qualifier.to_string()),
261
2
            ));
262
2
        }
0
263
        {
264
2
            let state = SortedAwaitedActionState::try_from(&self.0.state().stage)
265
2
                .err_tip(|| 
"In UpdateOperationIdToAwaitedAction::get_index"0
)
?0
;
266
2
            let sorted_awaited_action = SortedAwaitedAction::from(&self.0);
267
2
            output.push((
268
2
                "sort_key",
269
2
                Bytes::from(format!(
270
2
                    "{}{}",
271
2
                    get_state_prefix(state),
272
2
                    sorted_awaited_action.sort_key.as_u64(),
273
2
                )),
274
2
            ));
275
2
        }
276
2
        Ok(output)
277
2
    }
278
}
279
280
struct UpdateClientIdToOperationId {
281
    client_operation_id: ClientOperationId,
282
    operation_id: OperationId,
283
}
284
impl SchedulerStoreKeyProvider for UpdateClientIdToOperationId {
285
    type Versioned = FalseValue;
286
1
    fn get_key(&self) -> StoreKey<'static> {
287
1
        ClientIdToOperationId(&self.client_operation_id).get_key()
288
1
    }
289
}
290
impl SchedulerStoreDataProvider for UpdateClientIdToOperationId {
291
1
    fn try_into_bytes(self) -> Result<Bytes, Error> {
292
1
        serde_json::to_string(&self.operation_id)
293
1
            .map(Bytes::from)
294
1
            .map_err(|e| 
make_input_err!("Could not convert OperationId to json - {e:?}")0
)
295
1
    }
296
}
297
298
0
#[derive(MetricsComponent)]
299
pub struct StoreAwaitedActionDb<S: SchedulerStore, F: Fn() -> OperationId> {
300
    store: Arc<S>,
301
    now_fn: fn() -> SystemTime,
302
    operation_id_creator: F,
303
    _pull_task_change_subscriber_spawn: JoinHandleDropGuard<()>,
304
}
305
306
impl<S: SchedulerStore, F: Fn() -> OperationId> StoreAwaitedActionDb<S, F> {
307
1
    pub fn new(
308
1
        store: Arc<S>,
309
1
        task_change_publisher: Arc<Notify>,
310
1
        now_fn: fn() -> SystemTime,
311
1
        operation_id_creator: F,
312
1
    ) -> Result<Self, Error> {
313
1
        let mut subscription = store
314
1
            .subscription_manager()
315
1
            .err_tip(|| 
"In RedisAwaitedActionDb::new"0
)
?0
316
1
            .subscribe(OperationIdToAwaitedAction(Cow::Owned(OperationId::String(
317
1
                String::new(),
318
1
            ))))
319
1
            .err_tip(|| 
"In RedisAwaitedActionDb::new"0
)
?0
;
320
1
        let pull_task_change_subscriber = spawn!(
321
1
            "redis_awaited_action_db_pull_task_change_subscriber",
322
1
            async move {
323
                loop {
324
5
                    let 
changed_res4
= subscription
325
5
                        .changed()
326
3
                        .await
327
4
                        .err_tip(|| 
"In RedisAwaitedActionDb::new"0
);
328
4
                    if let Err(
err0
) = changed_res {
  Branch (328:28): [True: 0, False: 0]
  Branch (328:28): [Folded - Ignored]
  Branch (328:28): [True: 0, False: 4]
329
0
                        event!(
330
0
                            Level::ERROR,
331
0
                            "Error waiting for pull task change subscriber in RedisAwaitedActionDb::new  - {err:?}"
332
                        );
333
                        // Sleep for a second to avoid a busy loop, then trigger the notify
334
                        // so if a reconnect happens we let local resources know that things
335
                        // might have changed.
336
0
                        tokio::time::sleep(Duration::from_secs(1)).await;
337
4
                    }
338
4
                    task_change_publisher.as_ref().notify_one();
339
                }
340
1
            }
341
1
        );
342
1
        Ok(Self {
343
1
            store,
344
1
            now_fn,
345
1
            operation_id_creator,
346
1
            _pull_task_change_subscriber_spawn: pull_task_change_subscriber,
347
1
        })
348
1
    }
349
350
1
    async fn try_subscribe(
351
1
        &self,
352
1
        client_operation_id: &ClientOperationId,
353
1
        unique_qualifier: &ActionUniqueQualifier,
354
1
        // TODO(allada) To simplify the scheduler 2024 refactor, we
355
1
        // removed the ability to upgrade priorities of actions.
356
1
        // we should add priority upgrades back in.
357
1
        _priority: i32,
358
1
    ) -> Result<Option<OperationSubscriber<S>>, Error> {
359
1
        match unique_qualifier {
360
1
            ActionUniqueQualifier::Cachable(_) => {}
361
0
            ActionUniqueQualifier::Uncachable(_) => return Ok(None),
362
        }
363
1
        let stream = self
364
1
            .store
365
1
            .search_by_index_prefix(SearchUniqueQualifierToAwaitedAction(unique_qualifier))
366
3
            .await
367
1
            .err_tip(|| 
"In RedisAwaitedActionDb::try_subscribe"0
)
?0
;
368
1
        tokio::pin!(stream);
369
1
        let maybe_awaited_action = stream
370
1
            .try_next()
371
0
            .await
372
1
            .err_tip(|| 
"In RedisAwaitedActionDb::try_subscribe"0
)
?0
;
373
1
        match maybe_awaited_action {
374
0
            Some(awaited_action) => {
375
0
                // TODO(allada) We don't support joining completed jobs because we
376
0
                // need to also check that all the data is still in the cache.
377
0
                if awaited_action.state().stage.is_finished() {
  Branch (377:20): [True: 0, False: 0]
  Branch (377:20): [Folded - Ignored]
  Branch (377:20): [True: 0, False: 0]
378
0
                    return Ok(None);
379
0
                }
380
0
                // TODO(allada) We only care about the operation_id here, we should
381
0
                // have a way to tell the decoder we only care about specific fields.
382
0
                let operation_id = awaited_action.operation_id();
383
0
                Ok(Some(OperationSubscriber::new(
384
0
                    Some(client_operation_id.clone()),
385
0
                    OperationIdToAwaitedAction(Cow::Owned(operation_id.clone())),
386
0
                    Arc::downgrade(&self.store),
387
0
                    self.now_fn,
388
0
                )))
389
            }
390
1
            None => Ok(None),
391
        }
392
1
    }
393
394
0
    async fn inner_get_awaited_action_by_id(
395
0
        &self,
396
0
        client_operation_id: &ClientOperationId,
397
0
    ) -> Result<Option<OperationSubscriber<S>>, Error> {
398
0
        let maybe_operation_id = self
399
0
            .store
400
0
            .get_and_decode(ClientIdToOperationId(client_operation_id))
401
0
            .await
402
0
            .err_tip(|| "In RedisAwaitedActionDb::get_awaited_action_by_id")?;
403
0
        let Some(operation_id) = maybe_operation_id else {
  Branch (403:13): [True: 0, False: 0]
  Branch (403:13): [Folded - Ignored]
404
0
            return Ok(None);
405
        };
406
0
        Ok(Some(OperationSubscriber::new(
407
0
            Some(client_operation_id.clone()),
408
0
            OperationIdToAwaitedAction(Cow::Owned(operation_id)),
409
0
            Arc::downgrade(&self.store),
410
0
            self.now_fn,
411
0
        )))
412
0
    }
413
}
414
415
impl<S: SchedulerStore, F: Fn() -> OperationId + Send + Sync + Unpin + 'static> AwaitedActionDb
416
    for StoreAwaitedActionDb<S, F>
417
{
418
    type Subscriber = OperationSubscriber<S>;
419
420
0
    async fn get_awaited_action_by_id(
421
0
        &self,
422
0
        client_operation_id: &ClientOperationId,
423
0
    ) -> Result<Option<Self::Subscriber>, Error> {
424
0
        self.inner_get_awaited_action_by_id(client_operation_id)
425
0
            .await
426
0
    }
427
428
0
    async fn get_by_operation_id(
429
0
        &self,
430
0
        operation_id: &OperationId,
431
0
    ) -> Result<Option<Self::Subscriber>, Error> {
432
0
        Ok(Some(OperationSubscriber::new(
433
0
            None,
434
0
            OperationIdToAwaitedAction(Cow::Owned(operation_id.clone())),
435
0
            Arc::downgrade(&self.store),
436
0
            self.now_fn,
437
0
        )))
438
0
    }
439
440
1
    async fn update_awaited_action(&self, new_awaited_action: AwaitedAction) -> Result<(), Error> {
441
1
        let operation_id = new_awaited_action.operation_id().clone();
442
1
        let maybe_version = self
443
1
            .store
444
1
            .update_data(UpdateOperationIdToAwaitedAction(new_awaited_action))
445
2
            .await
446
1
            .err_tip(|| 
"In RedisAwaitedActionDb::update_awaited_action"0
)
?0
;
447
1
        if maybe_version.is_none() {
  Branch (447:12): [True: 0, False: 0]
  Branch (447:12): [Folded - Ignored]
  Branch (447:12): [True: 0, False: 1]
448
0
            return Err(make_err!(
449
0
                Code::Aborted,
450
0
                "Could not update AwaitedAction because the version did not match for {operation_id:?}",
451
0
            ));
452
1
        }
453
1
        Ok(())
454
1
    }
455
456
1
    async fn add_action(
457
1
        &self,
458
1
        client_operation_id: ClientOperationId,
459
1
        action_info: Arc<ActionInfo>,
460
1
    ) -> Result<Self::Subscriber, Error> {
461
        // Check to see if the action is already known and subscribe if it is.
462
1
        let subscription = self
463
1
            .try_subscribe(
464
1
                &client_operation_id,
465
1
                &action_info.unique_qualifier,
466
1
                action_info.priority,
467
1
            )
468
3
            .await
469
1
            .err_tip(|| 
"In RedisAwaitedActionDb::add_action"0
)
?0
;
470
1
        if let Some(
sub0
) = subscription {
  Branch (470:16): [True: 0, False: 0]
  Branch (470:16): [Folded - Ignored]
  Branch (470:16): [True: 0, False: 1]
471
0
            return Ok(sub);
472
1
        }
473
1
474
1
        let new_operation_id = (self.operation_id_creator)();
475
1
        let awaited_action =
476
1
            AwaitedAction::new(new_operation_id.clone(), action_info, (self.now_fn)());
477
1
        debug_assert!(
478
0
            ActionStage::Queued == awaited_action.state().stage,
479
0
            "Expected action to be queued"
480
        );
481
482
        // Note: Version is not needed with this api.
483
1
        let _version = self
484
1
            .store
485
1
            .update_data(UpdateOperationIdToAwaitedAction(awaited_action))
486
2
            .await
487
1
            .err_tip(|| 
"In RedisAwaitedActionDb::add_action"0
)
?0
488
1
            .err_tip(|| {
489
0
                "Version match failed for new action insert in RedisAwaitedActionDb::add_action"
490
1
            })
?0
;
491
492
1
        self.store
493
1
            .update_data(UpdateClientIdToOperationId {
494
1
                client_operation_id: client_operation_id.clone(),
495
1
                operation_id: new_operation_id.clone(),
496
1
            })
497
2
            .await
498
1
            .err_tip(|| 
"In RedisAwaitedActionDb::add_action"0
)
?0
;
499
500
1
        Ok(OperationSubscriber::new(
501
1
            Some(client_operation_id),
502
1
            OperationIdToAwaitedAction(Cow::Owned(new_operation_id)),
503
1
            Arc::downgrade(&self.store),
504
1
            self.now_fn,
505
1
        ))
506
1
    }
507
508
0
    async fn get_range_of_actions(
509
0
        &self,
510
0
        state: SortedAwaitedActionState,
511
0
        start: Bound<SortedAwaitedAction>,
512
0
        end: Bound<SortedAwaitedAction>,
513
0
        desc: bool,
514
0
    ) -> Result<impl Stream<Item = Result<Self::Subscriber, Error>> + Send, Error> {
515
0
        if !matches!(start, Bound::Unbounded) {
  Branch (515:12): [True: 0, False: 0]
  Branch (515:12): [Folded - Ignored]
516
0
            return Err(make_err!(
517
0
                Code::Unimplemented,
518
0
                "Start bound is not supported in RedisAwaitedActionDb::get_range_of_actions",
519
0
            ));
520
0
        }
521
0
        if !matches!(end, Bound::Unbounded) {
  Branch (521:12): [True: 0, False: 0]
  Branch (521:12): [Folded - Ignored]
522
0
            return Err(make_err!(
523
0
                Code::Unimplemented,
524
0
                "Start bound is not supported in RedisAwaitedActionDb::get_range_of_actions",
525
0
            ));
526
0
        }
527
0
        // TODO(allada) This API is not difficult to implement, but there is no code path
528
0
        // that uses it, so no reason to implement it yet.
529
0
        if !desc {
  Branch (529:12): [True: 0, False: 0]
  Branch (529:12): [Folded - Ignored]
530
0
            return Err(make_err!(
531
0
                Code::Unimplemented,
532
0
                "Descending order is not supported in RedisAwaitedActionDb::get_range_of_actions",
533
0
            ));
534
0
        }
535
0
        Ok(self
536
0
            .store
537
0
            .search_by_index_prefix(SearchSortKeyPrefixToAwaitedAction(get_state_prefix(state)))
538
0
            .await
539
0
            .err_tip(|| "In RedisAwaitedActionDb::get_range_of_actions")?
540
0
            .map_ok(move |awaited_action| {
541
0
                OperationSubscriber::new(
542
0
                    None,
543
0
                    OperationIdToAwaitedAction(Cow::Owned(awaited_action.operation_id().clone())),
544
0
                    Arc::downgrade(&self.store),
545
0
                    self.now_fn,
546
0
                )
547
0
            }))
548
0
    }
549
550
0
    async fn get_all_awaited_actions(
551
0
        &self,
552
0
    ) -> Result<impl Stream<Item = Result<Self::Subscriber, Error>>, Error> {
553
0
        Ok(self
554
0
            .store
555
0
            .search_by_index_prefix(SearchSortKeyPrefixToAwaitedAction(""))
556
0
            .await
557
0
            .err_tip(|| "In RedisAwaitedActionDb::get_range_of_actions")?
558
0
            .map_ok(move |awaited_action| {
559
0
                OperationSubscriber::new(
560
0
                    None,
561
0
                    OperationIdToAwaitedAction(Cow::Owned(awaited_action.operation_id().clone())),
562
0
                    Arc::downgrade(&self.store),
563
0
                    self.now_fn,
564
0
                )
565
0
            }))
566
0
    }
567
}