Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-store/src/redis_store.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::cmp;
17
use std::pin::Pin;
18
use std::sync::{Arc, Weak};
19
use std::time::Duration;
20
21
use async_trait::async_trait;
22
use bytes::Bytes;
23
use const_format::formatcp;
24
use fred::clients::{RedisClient, RedisPool, SubscriberClient};
25
use fred::interfaces::{ClientLike, KeysInterface, PubsubInterface};
26
use fred::prelude::{EventInterface, HashesInterface, RediSearchInterface};
27
use fred::types::{
28
    Builder, ConnectionConfig, FtCreateOptions, PerformanceConfig, ReconnectPolicy, RedisConfig,
29
    RedisKey, RedisMap, RedisValue, Script, SearchSchema, SearchSchemaKind,
30
};
31
use futures::{FutureExt, Stream, StreamExt};
32
use nativelink_config::stores::RedisMode;
33
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
34
use nativelink_metric::MetricsComponent;
35
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
36
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
37
use nativelink_util::spawn;
38
use nativelink_util::store_trait::{
39
    BoolValue, SchedulerCurrentVersionProvider, SchedulerIndexProvider, SchedulerStore,
40
    SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,
41
    SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo,
42
};
43
use nativelink_util::task::JoinHandleDropGuard;
44
use parking_lot::{Mutex, RwLock};
45
use patricia_tree::StringPatriciaMap;
46
use tokio::select;
47
use tokio::time::sleep;
48
use tracing::{event, Level};
49
use uuid::Uuid;
50
51
use crate::cas_utils::is_zero_digest;
52
use crate::redis_utils::ft_aggregate;
53
54
// TODO(caass): These (and other settings) should be made configurable via nativelink-config.
55
pub const READ_CHUNK_SIZE: usize = 64 * 1024;
56
const CONNECTION_POOL_SIZE: usize = 3;
57
58
#[allow(clippy::trivially_copy_pass_by_ref)]
59
1
fn to_hex(value: &u32) -> String {
60
1
    format!("{value:08x}")
61
1
}
62
63
/// A [`StoreDriver`] implementation that uses Redis as a backing store.
64
1
#[derive(MetricsComponent)]
65
pub struct RedisStore {
66
    /// The client pool connecting to the backing Redis instance(s).
67
    client_pool: RedisPool,
68
69
    /// A channel to publish updates to when a key is added, removed, or modified.
70
    #[metric(
71
        help = "The pubsub channel to publish updates to when a key is added, removed, or modified"
72
    )]
73
    pub_sub_channel: Option<String>,
74
75
    /// A redis client for managing subscriptions.
76
    /// TODO: This should be moved into the store in followups once a standard use pattern has been determined.
77
    subscriber_client: SubscriberClient,
78
79
    /// For metrics only.
80
    #[metric(
81
        help = "A unique identifier for the FT.CREATE command used to create the index template",
82
        handler = to_hex
83
    )]
84
    fingerprint_create_index: u32,
85
86
    /// A function used to generate names for temporary keys.
87
    temp_name_generator_fn: fn() -> String,
88
89
    /// A common prefix to append to all keys before they are sent to Redis.
90
    ///
91
    /// See [`RedisStore::key_prefix`](`nativelink_config::stores::RedisStore::key_prefix`).
92
    #[metric(help = "Prefix to append to all keys before sending to Redis")]
93
    key_prefix: String,
94
95
    /// Redis script used to update a value in redis if the version matches.
96
    /// This is done by incrementing the version number and then setting the new data
97
    /// only if the version number matches the existing version number.
98
    update_if_version_matches_script: Script,
99
100
    /// A manager for subscriptions to keys in Redis.
101
    subscription_manager: Mutex<Option<Arc<RedisSubscriptionManager>>>,
102
}
103
104
impl RedisStore {
105
    /// Create a new `RedisStore` from the given configuration.
106
0
    pub fn new(config: &nativelink_config::stores::RedisStore) -> Result<Arc<Self>, Error> {
107
0
        if config.addresses.is_empty() {
  Branch (107:12): [True: 0, False: 0]
  Branch (107:12): [Folded - Ignored]
108
0
            return Err(make_err!(
109
0
                Code::InvalidArgument,
110
0
                "No addresses were specified in redis store configuration."
111
0
            ));
112
0
        };
113
0
        let [addr] = config.addresses.as_slice() else {
  Branch (113:13): [True: 0, False: 0]
  Branch (113:13): [Folded - Ignored]
114
0
            return Err(make_err!(Code::Unimplemented, "Connecting directly to multiple redis nodes in a cluster is currently unsupported. Please specify a single URL to a single node, and nativelink will use cluster discover to find the other nodes."));
115
        };
116
0
        let redis_config = match config.mode {
117
0
            RedisMode::Cluster => RedisConfig::from_url_clustered(addr),
118
0
            RedisMode::Sentinel => RedisConfig::from_url_sentinel(addr),
119
0
            RedisMode::Standard => RedisConfig::from_url_centralized(addr),
120
        }
121
0
        .err_tip_with_code(|e| {
122
0
            (
123
0
                Code::InvalidArgument,
124
0
                format!("while parsing redis node address: {e}"),
125
0
            )
126
0
        })?;
127
128
0
        let mut builder = Builder::from_config(redis_config);
129
0
        builder
130
0
            .set_performance_config(PerformanceConfig {
131
0
                default_command_timeout: Duration::from_secs(config.response_timeout_s),
132
0
                ..Default::default()
133
0
            })
134
0
            .set_connection_config(ConnectionConfig {
135
0
                connection_timeout: Duration::from_secs(config.connection_timeout_s),
136
0
                internal_command_timeout: Duration::from_secs(config.response_timeout_s),
137
0
                ..Default::default()
138
0
            })
139
0
            // TODO(caass): Make this configurable.
140
0
            .set_policy(ReconnectPolicy::new_constant(1, 0));
141
0
142
0
        Self::new_from_builder_and_parts(
143
0
            builder,
144
0
            config.experimental_pub_sub_channel.clone(),
145
0
            || Uuid::new_v4().to_string(),
146
0
            config.key_prefix.clone(),
147
0
        )
148
0
        .map(Arc::new)
149
0
    }
150
151
    /// Used for testing when determinism is required.
152
10
    pub fn new_from_builder_and_parts(
153
10
        mut builder: Builder,
154
10
        pub_sub_channel: Option<String>,
155
10
        temp_name_generator_fn: fn() -> String,
156
10
        key_prefix: String,
157
10
    ) -> Result<Self, Error> {
158
10
        let client_pool = builder
159
10
            .set_performance_config(PerformanceConfig {
160
10
                broadcast_channel_capacity: 4096,
161
10
                ..Default::default()
162
10
            })
163
10
            .build_pool(CONNECTION_POOL_SIZE)
164
10
            .err_tip(|| 
"while creating redis connection pool"0
)
?0
;
165
166
10
        let subscriber_client = builder
167
10
            .build_subscriber_client()
168
10
            .err_tip(|| 
"while creating redis subscriber client"0
)
?0
;
169
        // Fires off a background task using `tokio::spawn`.
170
10
        client_pool.connect();
171
10
        subscriber_client.connect();
172
10
173
10
        Ok(Self {
174
10
            client_pool,
175
10
            pub_sub_channel,
176
10
            subscriber_client,
177
10
            fingerprint_create_index: fingerprint_create_index_template(),
178
10
            temp_name_generator_fn,
179
10
            key_prefix,
180
10
            update_if_version_matches_script: Script::from_lua(LUA_VERSION_SET_SCRIPT),
181
10
            subscription_manager: Mutex::new(None),
182
10
        })
183
10
    }
184
185
    /// Encode a [`StoreKey`] so it can be sent to Redis.
186
21
    fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> {
187
21
        let key_body = key.as_str();
188
21
        if self.key_prefix.is_empty() {
  Branch (188:12): [True: 17, False: 4]
  Branch (188:12): [Folded - Ignored]
189
17
            key_body
190
        } else {
191
            // This is in the hot path for all redis operations, so we try to reuse the allocation
192
            // from `key.as_str()` if possible.
193
4
            match key_body {
194
4
                Cow::Owned(mut encoded_key) => {
195
4
                    encoded_key.insert_str(0, &self.key_prefix);
196
4
                    Cow::Owned(encoded_key)
197
                }
198
0
                Cow::Borrowed(body) => {
199
0
                    let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len());
200
0
                    encoded_key.push_str(&self.key_prefix);
201
0
                    encoded_key.push_str(body);
202
0
                    Cow::Owned(encoded_key)
203
                }
204
            }
205
        }
206
21
    }
207
208
0
    pub fn get_client(&self) -> RedisClient {
209
0
        self.client_pool.next().clone()
210
0
    }
211
}
212
213
#[async_trait]
214
impl StoreDriver for RedisStore {
215
    async fn has_with_results(
216
        self: Pin<&Self>,
217
        keys: &[StoreKey<'_>],
218
        results: &mut [Option<u64>],
219
6
    ) -> Result<(), Error> {
220
        // TODO(caass): Optimize for the case where `keys.len() == 1`
221
6
        let pipeline = self.client_pool.next().pipeline();
222
6
223
6
        results.iter_mut().for_each(|result| *result = None);
224
225
6
        for (idx, key) in keys.iter().enumerate() {
226
            // Don't bother with zero-length digests.
227
6
            if is_zero_digest(key.borrow()) {
  Branch (227:16): [True: 2, False: 4]
  Branch (227:16): [Folded - Ignored]
228
2
                results[idx] = Some(0);
229
2
                continue;
230
4
            }
231
4
232
4
            let encoded_key = self.encode_key(key);
233
4
234
4
            // This command is queued in memory, but not yet sent down the pipeline; the `await` returns instantly.
235
4
            pipeline
236
4
                .strlen::<(), _>(encoded_key.as_ref())
237
0
                .await
238
4
                .err_tip(|| 
"In RedisStore::has_with_results"0
)
?0
;
239
        }
240
241
        // Send the queued commands.
242
6
        let mut responses = pipeline.all::<Vec<_>>().
await4
?0
.into_iter();
243
6
        let mut remaining_results = results.iter_mut().filter(|option| {
244
6
            // Anything that's `Some` was already set from `is_zero_digest`.
245
6
            option.is_none()
246
6
        });
247
248
        // Similar to `Iterator::zip`, but with some verification at the end that the lengths were equal.
249
10
        while let (Some(
response), Some(result_slot4
)) = (responses.next(), remaining_results.next())
  Branch (249:19): [True: 4, False: 6]
  Branch (249:19): [Folded - Ignored]
250
        {
251
4
            if response == 0 {
  Branch (251:16): [True: 0, False: 4]
  Branch (251:16): [Folded - Ignored]
252
                // Redis returns 0 when the key doesn't exist AND when the key exists with value of length 0.
253
                // Since we already checked zero-lengths with `is_zero_digest`, this means the value doesn't exist.
254
0
                continue;
255
4
            }
256
4
257
4
            *result_slot = Some(response);
258
        }
259
260
6
        if responses.next().is_some() {
  Branch (260:12): [True: 0, False: 6]
  Branch (260:12): [Folded - Ignored]
261
0
            Err(make_err!(
262
0
                Code::Internal,
263
0
                "Received more responses than expected in RedisStore::has_with_results"
264
0
            ))
265
6
        } else if remaining_results.next().is_some() {
  Branch (265:19): [True: 0, False: 6]
  Branch (265:19): [Folded - Ignored]
266
0
            Err(make_err!(
267
0
                Code::Internal,
268
0
                "Received fewer responses than expected in RedisStore::has_with_results"
269
0
            ))
270
        } else {
271
6
            Ok(())
272
        }
273
12
    }
274
275
    async fn update(
276
        self: Pin<&Self>,
277
        key: StoreKey<'_>,
278
        mut reader: DropCloserReadHalf,
279
        _upload_size: UploadSizeInfo,
280
7
    ) -> Result<(), Error> {
281
7
        let final_key = self.encode_key(&key);
282
7
283
7
        // While the name generation function can be supplied by the user, we need to have the curly
284
7
        // braces in place in order to manage redis' hashing behavior and make sure that the temporary
285
7
        // key name and the final key name are directed to the same cluster node. See
286
7
        // https://redis.io/blog/redis-clustering-best-practices-with-keys/
287
7
        //
288
7
        // The TL;DR is that if we're in cluster mode and the names hash differently, we can't use request
289
7
        // pipelining. By using these braces, we tell redis to only hash the part of the temporary key that's
290
7
        // identical to the final key -- so they will always hash to the same node.
291
7
        let temp_key = format!(
292
7
            "temp-{}-{{{}}}",
293
7
            (self.temp_name_generator_fn)(),
294
7
            &final_key
295
7
        );
296
7
297
7
        let client = self.client_pool.next();
298
7
299
7
        // This loop is a little confusing at first glance, but essentially the process is:
300
7
        // - Get as much data from the reader as possible
301
7
        // - When the reader is empty, but the writer isn't done sending data, write that data to redis
302
7
        // - When the writer is done sending data, write the data and break from the loop
303
7
        //
304
7
        // At one extreme, we could append data in redis every time we read some bytes -- that is, make one TCP request
305
7
        // per channel read. This is wasteful since we anticipate reading many small chunks of bytes from the reader.
306
7
        //
307
7
        // At the other extreme, we could make a single TCP request to write all of the data all at once.
308
7
        // This could also be an issue if we read loads of data, since we'd send one massive TCP request
309
7
        // rather than a few moderately-sized requests.
310
7
        //
311
7
        // To compromise, we buffer opportunistically -- when the reader doesn't have any data ready to read, but it's
312
7
        // not done getting data, we flush the data we _have_ read to redis before waiting for the reader to get more.
313
7
        //
314
7
        // As a result of this, there will be a span of time where a key in Redis has only partial data. We want other
315
7
        // observers to notice atomic updates to keys, rather than partial updates, so we first write to a temporary key
316
7
        // and then rename that key once we're done appending data.
317
7
        let mut is_first_chunk = true;
318
7
        let mut eof_reached = false;
319
7
        let mut pipe = client.pipeline();
320
321
16
        while !eof_reached {
  Branch (321:15): [True: 12, False: 4]
  Branch (321:15): [Folded - Ignored]
322
12
            pipe = client.pipeline();
323
12
            let mut pipe_size = 0;
324
            const MAX_PIPE_SIZE: usize = 5 * 1024 * 1024; // 5 MB
325
326
12
            let 
chunk11
= reader
327
12
                .recv()
328
3
                .await
329
12
                .err_tip(|| 
"Failed to reach chunk in update in redis store"1
)
?1
;
330
331
11
            if chunk.is_empty() {
  Branch (331:16): [True: 6, False: 5]
  Branch (331:16): [Folded - Ignored]
332
                // There are three cases where we receive an empty chunk:
333
                // 1. The first chunk of a zero-digest key. We're required to treat all zero-digest keys as if they exist
334
                //    and are empty per the RBE spec, so we can just return early as if we've pushed it -- any attempts to
335
                //    read the value later will similarly avoid the network trip.
336
                // 2. This is an empty first chunk of a non-zero-digest key. In this case, we _do_ need to push up an
337
                //    empty key, but can skip the rest of the process around renaming since there's only the one operation.
338
                // 3. This is the last chunk (EOF) of a regular key. In that case we can skip pushing this chunk.
339
                //
340
                // In all three cases, we're done pushing data and can move it from the temporary key to the final key.
341
6
                if is_first_chunk && 
is_zero_digest(key.borrow())2
{
  Branch (341:20): [True: 2, False: 4]
  Branch (341:38): [True: 2, False: 0]
  Branch (341:20): [Folded - Ignored]
  Branch (341:38): [Folded - Ignored]
342
                    // Case 1, a zero-digest key.
343
2
                    return Ok(());
344
4
                } else if is_first_chunk {
  Branch (344:27): [True: 0, False: 4]
  Branch (344:27): [Folded - Ignored]
345
                    // Case 2, an empty non-zero-digest key.
346
0
                    pipe.append::<(), _, _>(&temp_key, "")
347
0
                        .await
348
0
                        .err_tip(|| "While appending to temp key in RedisStore::update")?;
349
4
                };
350
351
                // Note: setting `eof_reached = true` and calling `continue` is semantically equivalent to `break`.
352
                // Since we need to use the `eof_reached` flag in the inner loop, we do the same here
353
                // for consistency.
354
4
                eof_reached = true;
355
4
                continue;
356
5
            } else {
357
5
                // Not EOF, but we've now received our first chunk.
358
5
                is_first_chunk = false;
359
5
            }
360
5
361
5
            // Queue the append, but don't execute until we've received all the chunks.
362
5
            pipe_size += chunk.len();
363
5
            pipe.append::<(), _, _>(&temp_key, chunk)
364
0
                .await
365
5
                .err_tip(|| 
"Failed to append to temp key in RedisStore::update"0
)
?0
;
366
367
            // Opportunistically grab any other chunks already in the reader.
368
5
            while let Some(
chunk0
) = reader
  Branch (368:23): [True: 0, False: 5]
  Branch (368:23): [Folded - Ignored]
369
5
                .try_recv()
370
5
                .transpose()
371
5
                .err_tip(|| 
"Failed to reach chunk in update in redis store"0
)
?0
372
            {
373
0
                if chunk.is_empty() {
  Branch (373:20): [True: 0, False: 0]
  Branch (373:20): [Folded - Ignored]
374
0
                    eof_reached = true;
375
0
                    break;
376
                } else {
377
0
                    pipe_size += chunk.len();
378
0
                    pipe.append::<(), _, _>(&temp_key, chunk)
379
0
                        .await
380
0
                        .err_tip(|| "Failed to append to temp key in RedisStore::update")?;
381
                }
382
383
                // Stop appending if the pipeline is already holding 5MB of data.
384
0
                if pipe_size >= MAX_PIPE_SIZE {
  Branch (384:20): [True: 0, False: 0]
  Branch (384:20): [Folded - Ignored]
385
0
                    break;
386
0
                }
387
            }
388
389
            // We've exhausted the reader (or hit the 5MB cap), but more data is expected.
390
            // Executing the queued commands appends the data we just received to the temp key.
391
5
            pipe.all::<()>()
392
5
                .await
393
5
                .err_tip(|| 
"Failed to append to temporary key in RedisStore::update"0
)
?0
;
394
        }
395
396
        // Rename the temp key so that the data appears under the real key. Any data already present in the real key is lost.
397
4
        pipe.rename::<(), _, _>(&temp_key, final_key.as_ref())
398
0
            .await
399
4
            .err_tip(|| 
"While queueing key rename in RedisStore::update()"0
)
?0
;
400
4
        pipe.all::<()>()
401
4
            .await
402
4
            .err_tip(|| 
"While renaming key in RedisStore::update()"0
)
?0
;
403
404
        // If we have a publish channel configured, send a notice that the key has been set.
405
4
        if let Some(
pub_sub_channel0
) = &self.pub_sub_channel {
  Branch (405:16): [True: 0, False: 4]
  Branch (405:16): [Folded - Ignored]
406
0
            return Ok(client.publish(pub_sub_channel, final_key.as_ref()).await?);
407
4
        };
408
4
409
4
        Ok(())
410
14
    }
411
412
    async fn get_part(
413
        self: Pin<&Self>,
414
        key: StoreKey<'_>,
415
        writer: &mut DropCloserWriteHalf,
416
        offset: u64,
417
        length: Option<u64>,
418
5
    ) -> Result<(), Error> {
419
5
        let offset = usize::try_from(offset).err_tip(|| 
"Could not convert offset to usize"0
)
?0
;
420
5
        let length = length
421
5
            .map(|v| 
usize::try_from(v).err_tip(4
||
"Could not convert length to usize"0
)4
)
422
5
            .transpose()
?0
;
423
424
        // To follow RBE spec we need to consider any digest's with
425
        // zero size to be existing.
426
5
        if is_zero_digest(key.borrow()) {
  Branch (426:12): [True: 0, False: 5]
  Branch (426:12): [Folded - Ignored]
427
0
            return writer
428
0
                .send_eof()
429
0
                .err_tip(|| "Failed to send zero EOF in redis store get_part");
430
5
        }
431
5
432
5
        let client = self.client_pool.next();
433
5
        let encoded_key = self.encode_key(&key);
434
5
        let encoded_key = encoded_key.as_ref();
435
5
436
5
        // N.B. the `-1`'s you see here are because redis GETRANGE is inclusive at both the start and end, so when we
437
5
        // do math with indices we change them to be exclusive at the end.
438
5
439
5
        // We want to read the data at the key from `offset` to `offset + length`.
440
5
        let data_start = offset;
441
5
        let data_end = data_start
442
5
            .saturating_add(length.unwrap_or(isize::MAX as usize))
443
5
            .saturating_sub(1);
444
5
445
5
        // And we don't ever want to read more than `READ_CHUNK_SIZE` bytes at a time, so we'll need to iterate.
446
5
        let mut chunk_start = data_start;
447
5
        let mut chunk_end = cmp::min(data_start.saturating_add(READ_CHUNK_SIZE) - 1, data_end);
448
449
        loop {
450
6
            let chunk: Bytes = client
451
6
                .getrange(encoded_key, chunk_start, chunk_end)
452
6
                .await
453
6
                .err_tip(|| 
"In RedisStore::get_part::getrange"0
)
?0
;
454
455
6
            let didnt_receive_full_chunk = chunk.len() < READ_CHUNK_SIZE;
456
6
            let reached_end_of_data = chunk_end == data_end;
457
6
458
6
            if didnt_receive_full_chunk || 
reached_end_of_data1
{
  Branch (458:16): [True: 5, False: 1]
  Branch (458:44): [True: 0, False: 1]
  Branch (458:16): [Folded - Ignored]
  Branch (458:44): [Folded - Ignored]
459
5
                if !chunk.is_empty() {
  Branch (459:20): [True: 4, False: 1]
  Branch (459:20): [Folded - Ignored]
460
4
                    writer
461
4
                        .send(chunk)
462
0
                        .await
463
4
                        .err_tip(|| 
"Failed to write data in RedisStore::get_part"0
)
?0
;
464
1
                }
465
466
5
                break; // No more data to read.
467
1
            }
468
1
469
1
            // We received a full chunk's worth of data, so write it...
470
1
            writer
471
1
                .send(chunk)
472
0
                .await
473
1
                .err_tip(|| 
"Failed to write data in RedisStore::get_part"0
)
?0
;
474
475
            // ...and go grab the next chunk.
476
1
            chunk_start = chunk_end + 1;
477
1
            chunk_end = cmp::min(chunk_start.saturating_add(READ_CHUNK_SIZE) - 1, data_end);
478
        }
479
480
        // If we didn't write any data, check if the key exists, if not return a NotFound error.
481
        // This is required by spec.
482
5
        if writer.get_bytes_written() == 0 {
  Branch (482:12): [True: 1, False: 4]
  Branch (482:12): [Folded - Ignored]
483
            // We're supposed to read 0 bytes, so just check if the key exists.
484
1
            let exists = client
485
1
                .exists::<bool, _>(encoded_key)
486
1
                .await
487
1
                .err_tip(|| 
"In RedisStore::get_part::zero_exists"0
)
?0
;
488
489
1
            if !exists {
  Branch (489:16): [True: 1, False: 0]
  Branch (489:16): [Folded - Ignored]
490
1
                return Err(make_err!(
491
1
                    Code::NotFound,
492
1
                    "Data not found in Redis store for digest: {key:?}"
493
1
                ));
494
0
            }
495
4
        }
496
497
4
        writer
498
4
            .send_eof()
499
4
            .err_tip(|| 
"Failed to write EOF in redis store get_part"0
)
500
10
    }
501
502
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
503
0
        self
504
0
    }
505
506
0
    fn as_any(&self) -> &(dyn std::any::Any + Sync + Send) {
507
0
        self
508
0
    }
509
510
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send> {
511
0
        self
512
0
    }
513
514
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
515
0
        registry.register_indicator(self);
516
0
    }
517
}
518
519
#[async_trait]
520
impl HealthStatusIndicator for RedisStore {
521
0
    fn get_name(&self) -> &'static str {
522
0
        "RedisStore"
523
0
    }
524
525
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
526
0
        StoreDriver::check_health(Pin::new(self), namespace).await
527
0
    }
528
}
529
530
/// -------------------------------------------------------------------
531
/// Below this line are specific to the redis scheduler implementation.
532
/// -------------------------------------------------------------------
533
534
/// The maximum number of results to return per cursor.
535
const MAX_COUNT_PER_CURSOR: u64 = 256;
536
/// The time in milliseconds that a redis cursor can be idle before it is closed.
537
const CURSOR_IDLE_MS: u64 = 2_000;
538
/// The name of the field in the Redis hash that stores the data.
539
const DATA_FIELD_NAME: &str = "data";
540
/// The name of the field in the Redis hash that stores the version.
541
const VERSION_FIELD_NAME: &str = "version";
542
/// The time to live of indexes in seconds. After this time redis may delete the index.
543
const INDEX_TTL_S: u64 = 60 * 60 * 24; // 24 hours.
544
545
/// String of the `FT.CREATE` command used to create the index template. It is done this
546
/// way so we can use it in both const (compile time) functions and runtime functions.
547
/// This is a macro because we need to use it in multiple places that sometimes require the
548
/// data as different data types (specifically for rust's format_args! macro).
549
macro_rules! get_create_index_template {
550
    () => {
551
        "FT.CREATE {} ON HASH PREFIX 1 {} NOOFFSETS NOHL NOFIELDS NOFREQS SCHEMA {} TAG CASESENSITIVE SORTABLE"
552
    }
553
}
554
555
/// Lua script to set a key if the version matches.
556
/// Args:
557
///   KEYS[1]: The key where the version is stored.
558
///   ARGV[1]: The expected version.
559
///   ARGV[2]: The new data.
560
///   ARGV[3*]: Key-value pairs of additional data to include.
561
/// Returns:
562
///   The new version if the version matches. nil is returned if the
563
///   value was not set.
564
const LUA_VERSION_SET_SCRIPT: &str = formatcp!(
565
    r#"
566
local key = KEYS[1]
567
local expected_version = tonumber(ARGV[1])
568
local new_data = ARGV[2]
569
local new_version = redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', 1)
570
local i
571
local indexes = {{}}
572
573
if new_version-1 ~= expected_version then
574
    redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', -1)
575
    return 0
576
end
577
-- Skip first 2 argvs, as they are known inputs.
578
-- Remember: Lua is 1-indexed.
579
for i=3, #ARGV do
580
    indexes[i-2] = ARGV[i]
581
end
582
583
-- In testing we witnessed redis sometimes not update our FT indexes
584
-- resulting in stale data. It appears if we delete our keys then insert
585
-- them again it works and reduces risk significantly.
586
redis.call('DEL', key)
587
redis.call('HSET', key, '{DATA_FIELD_NAME}', new_data, '{VERSION_FIELD_NAME}', new_version, unpack(indexes))
588
589
return new_version
590
"#
591
);
592
593
/// Compile-time fingerprint of the `FT.CREATE` command used to create the index template.
594
/// This is a simple CRC32 checksum of the command string. We don't care about it actually
595
/// being a valid CRC32 checksum, just that it's a unique identifier with a low chance of
596
/// collision.
597
13
const fn fingerprint_create_index_template() -> u32 {
598
    const POLY: u32 = 0xEDB8_8320;
599
    const DATA: &[u8] = get_create_index_template!().as_bytes();
600
13
    let mut crc = 0xFFFF_FFFF;
601
13
    let mut i = 0;
602
1.32k
    while i < DATA.len() {
  Branch (602:11): [True: 1.31k, False: 13]
  Branch (602:11): [Folded - Ignored]
603
1.31k
        let byte = DATA[i];
604
1.31k
        crc ^= byte as u32;
605
1.31k
606
1.31k
        let mut j = 0;
607
11.8k
        while j < 8 {
  Branch (607:15): [True: 10.5k, False: 1.31k]
  Branch (607:15): [Folded - Ignored]
608
10.5k
            crc = if crc & 1 != 0 {
  Branch (608:22): [True: 5.01k, False: 5.48k]
  Branch (608:22): [Folded - Ignored]
609
5.01k
                (crc >> 1) ^ POLY
610
            } else {
611
5.48k
                crc >> 1
612
            };
613
10.5k
            j += 1;
614
        }
615
1.31k
        i += 1;
616
    }
617
13
    crc
618
13
}
619
620
/// Get the name of the index to create for the given field.
621
/// This will add some prefix data to the name to try and ensure
622
/// if the index definition changes, the name will get a new name.
623
macro_rules! get_index_name {
624
    ($prefix:expr, $field:expr) => {
625
        format_args!(
626
            "{}_{}_{:08x}",
627
            $prefix,
628
            $field,
629
            fingerprint_create_index_template(),
630
        )
631
    };
632
}
633
634
/// Try to sanitize a string to be used as a Redis key.
635
/// We don't actually modify the string, just check if it's valid.
636
2
const fn try_sanitize(s: &str) -> Option<&str> {
637
2
    // Note: We cannot use for loops or iterators here because they are not const.
638
2
    // Allowing us to use a const function here gives the compiler the ability to
639
2
    // optimize this function away entirely in the case where the input is constant.
640
2
    let chars = s.as_bytes();
641
2
    let mut i: usize = 0;
642
2
    let len = s.len();
643
    loop {
644
180
        if i >= len {
  Branch (644:12): [True: 2, False: 178]
  Branch (644:12): [Folded - Ignored]
645
2
            break;
646
178
        }
647
178
        let c = chars[i];
648
178
        if !c.is_ascii_alphanumeric() && 
c != b'_'10
{
  Branch (648:12): [True: 10, False: 168]
  Branch (648:42): [True: 0, False: 10]
  Branch (648:12): [Folded - Ignored]
  Branch (648:42): [Folded - Ignored]
649
0
            return None;
650
178
        }
651
178
        i += 1;
652
    }
653
2
    Some(s)
654
2
}
655
656
/// An individual subscription to a key in Redis.
657
pub struct RedisSubscription {
658
    receiver: Option<tokio::sync::watch::Receiver<String>>,
659
    weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
660
}
661
662
impl SchedulerSubscription for RedisSubscription {
663
    /// Wait for the subscription key to change.
664
7
    async fn changed(&mut self) -> Result<(), Error> {
665
7
        let receiver = self
666
7
            .receiver
667
7
            .as_mut()
668
7
            .ok_or_else(|| 
make_err!(Code::Internal, "In RedisSubscription::changed::as_mut")0
)
?0
;
669
7
        receiver
670
7
            .changed()
671
5
            .await
672
6
            .map_err(|_| 
make_err!(Code::Internal, "In RedisSubscription::changed::changed")0
)
673
6
    }
674
}
675
676
// If the subscription is dropped, we need to possibly remove the key from the
677
// subscribed keys map.
678
impl Drop for RedisSubscription {
679
2
    fn drop(&mut self) {
680
2
        let Some(receiver) = self.receiver.take() else {
  Branch (680:13): [True: 2, False: 0]
  Branch (680:13): [Folded - Ignored]
681
0
            event!(
682
0
                Level::WARN,
683
0
                "RedisSubscription has already been dropped, nothing to do."
684
            );
685
0
            return; // Already dropped, nothing to do.
686
        };
687
2
        let key = receiver.borrow().clone();
688
2
        // IMPORTANT: This must be dropped before receiver_count() is called.
689
2
        drop(receiver);
690
2
        let Some(subscribed_keys) = self.weak_subscribed_keys.upgrade() else {
  Branch (690:13): [True: 2, False: 0]
  Branch (690:13): [Folded - Ignored]
691
0
            return; // Already dropped, nothing to do.
692
        };
693
2
        let mut subscribed_keys = subscribed_keys.write();
694
2
        let Some(value) = subscribed_keys.get(&key) else {
  Branch (694:13): [True: 2, False: 0]
  Branch (694:13): [Folded - Ignored]
695
0
            event!(
696
0
                Level::ERROR,
697
0
                "Key {key} was not found in subscribed keys when checking if it should be removed."
698
            );
699
0
            return;
700
        };
701
        // If we have no receivers, cleanup the entry from our map.
702
2
        if value.receiver_count() == 0 {
  Branch (702:12): [True: 2, False: 0]
  Branch (702:12): [Folded - Ignored]
703
2
            subscribed_keys.remove(key);
704
2
        }
0
705
2
    }
706
}
707
708
/// A publisher for a key in Redis.
709
struct RedisSubscriptionPublisher {
710
    sender: Mutex<tokio::sync::watch::Sender<String>>,
711
}
712
713
impl RedisSubscriptionPublisher {
714
2
    fn new(
715
2
        key: String,
716
2
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
717
2
    ) -> (Self, RedisSubscription) {
718
2
        let (sender, receiver) = tokio::sync::watch::channel(key);
719
2
        let publisher = Self {
720
2
            sender: Mutex::new(sender),
721
2
        };
722
2
        let subscription = RedisSubscription {
723
2
            receiver: Some(receiver),
724
2
            weak_subscribed_keys,
725
2
        };
726
2
        (publisher, subscription)
727
2
    }
728
729
0
    fn subscribe(
730
0
        &self,
731
0
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
732
0
    ) -> RedisSubscription {
733
0
        let receiver = self.sender.lock().subscribe();
734
0
        RedisSubscription {
735
0
            receiver: Some(receiver),
736
0
            weak_subscribed_keys,
737
0
        }
738
0
    }
739
740
2
    fn receiver_count(&self) -> usize {
741
2
        self.sender.lock().receiver_count()
742
2
    }
743
744
4
    fn notify(&self) {
745
4
        // TODO(https://github.com/sile/patricia_tree/issues/40) When this is addressed
746
4
        // we can remove the `Mutex` and use the mutable iterator directly.
747
4
        self.sender.lock().send_modify(|_| {});
748
4
    }
749
}
750
751
pub struct RedisSubscriptionManager {
752
    subscribed_keys: Arc<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
753
    tx_for_test: tokio::sync::mpsc::UnboundedSender<String>,
754
    _subscription_spawn: JoinHandleDropGuard<()>,
755
}
756
757
impl RedisSubscriptionManager {
758
1
    pub fn new(subscribe_client: SubscriberClient, pub_sub_channel: String) -> Self {
759
1
        let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new()));
760
1
        let subscribed_keys_weak = Arc::downgrade(&subscribed_keys);
761
1
        let (tx_for_test, mut rx_for_test) = tokio::sync::mpsc::unbounded_channel();
762
1
        Self {
763
1
            subscribed_keys,
764
1
            tx_for_test,
765
1
            _subscription_spawn: spawn!("redis_subscribe_spawn", async move {
766
1
                let mut rx = subscribe_client.message_rx();
767
                loop {
768
1
                    if let Err(
e0
) = subscribe_client.subscribe(&pub_sub_channel).await {
  Branch (768:28): [True: 0, False: 1]
  Branch (768:28): [Folded - Ignored]
769
0
                        event!(Level::ERROR, "Error subscribing to pattern - {e}");
770
0
                        return;
771
1
                    }
772
1
                    let mut reconnect_rx = subscribe_client.reconnect_rx();
773
1
                    let reconnect_fut = reconnect_rx.recv().fuse();
774
1
                    tokio::pin!(reconnect_fut);
775
                    loop {
776
4
                        let 
key3
= select! {
777
4
                            
value3
= rx_for_test.recv() => {
778
3
                                let Some(value) = value else {
  Branch (778:37): [True: 3, False: 0]
  Branch (778:37): [Folded - Ignored]
779
0
                                    unreachable!("Channel should never close");
780
                                };
781
3
                                value.into()
782
                            },
783
4
                            
msg0
= rx.recv() => {
784
0
                                match msg {
785
0
                                    Ok(msg) => {
786
0
                                        if let RedisValue::String(s) = msg.value {
  Branch (786:48): [True: 0, False: 0]
  Branch (786:48): [Folded - Ignored]
787
0
                                            s
788
                                        } else {
789
0
                                            event!(Level::ERROR, "Received non-string message in RedisSubscriptionManager");
790
0
                                            continue;
791
                                        }
792
                                    },
793
0
                                    Err(e) => {
794
0
                                        // Check to see if our parent has been dropped and if so kill spawn.
795
0
                                        if subscribed_keys_weak.upgrade().is_none() {
  Branch (795:44): [True: 0, False: 0]
  Branch (795:44): [Folded - Ignored]
796
0
                                            event!(Level::WARN, "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn");
797
0
                                            return;
798
0
                                        };
799
0
                                        event!(Level::ERROR, "Error receiving message in RedisSubscriptionManager reconnecting and flagging everything changed - {e}");
800
0
                                        break;
801
                                    }
802
                                }
803
                            },
804
4
                            _ = &mut reconnect_fut => {
805
0
                                event!(Level::WARN, "Redis reconnected flagging all subscriptions as changed and resuming");
806
0
                                break;
807
                            }
808
                        };
809
3
                        let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
  Branch (809:29): [True: 3, False: 0]
  Branch (809:29): [Folded - Ignored]
810
0
                            event!(Level::WARN, "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn");
811
0
                            return;
812
                        };
813
3
                        let subscribed_keys_mux = subscribed_keys.read();
814
3
                        subscribed_keys_mux
815
3
                            .common_prefix_values(&*key)
816
3
                            .for_each(RedisSubscriptionPublisher::notify);
817
                    }
818
                    // Sleep for a small amount of time to ensure we don't reconnect too quickly.
819
0
                    sleep(Duration::from_secs(1)).await;
820
                    // If we reconnect or lag behind we might have had dirty keys, so we need to
821
                    // flag all of them as changed.
822
0
                    let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
  Branch (822:25): [True: 0, False: 0]
  Branch (822:25): [Folded - Ignored]
823
0
                        event!(Level::WARN, "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn");
824
0
                        return;
825
                    };
826
0
                    let subscribed_keys_mux = subscribed_keys.read();
827
0
                    // Just in case also get a new receiver.
828
0
                    rx = subscribe_client.message_rx();
829
0
                    // Drop all buffered messages, then flag everything as changed.
830
0
                    rx.resubscribe();
831
0
                    for publisher in subscribed_keys_mux.values() {
832
0
                        publisher.notify();
833
0
                    }
834
                }
835
1
            
}0
),
836
        }
837
1
    }
838
}
839
840
impl SchedulerSubscriptionManager for RedisSubscriptionManager {
841
    type Subscription = RedisSubscription;
842
843
3
    fn notify_for_test(&self, value: String) {
844
3
        self.tx_for_test.send(value).unwrap();
845
3
    }
846
847
2
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
848
2
    where
849
2
        K: SchedulerStoreKeyProvider,
850
2
    {
851
2
        let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys);
852
2
        let mut subscribed_keys = self.subscribed_keys.write();
853
2
        let key = key.get_key();
854
2
        let key_str = key.as_str();
855
2
        let mut subscription = if let Some(
publisher0
) = subscribed_keys.get(&key_str) {
  Branch (855:39): [True: 0, False: 0]
  Branch (855:39): [Folded - Ignored]
  Branch (855:39): [Folded - Ignored]
  Branch (855:39): [True: 0, False: 2]
856
0
            publisher.subscribe(weak_subscribed_keys)
857
        } else {
858
2
            let (publisher, subscription) =
859
2
                RedisSubscriptionPublisher::new(key_str.to_string(), weak_subscribed_keys);
860
2
            subscribed_keys.insert(key_str, publisher);
861
2
            subscription
862
        };
863
2
        subscription
864
2
            .receiver
865
2
            .as_mut()
866
2
            .ok_or_else(|| {
867
0
                make_err!(
868
0
                    Code::Internal,
869
0
                    "Receiver should be set in RedisSubscriptionManager::subscribe"
870
0
                )
871
2
            })
?0
872
2
            .mark_changed();
873
2
874
2
        Ok(subscription)
875
2
    }
876
}
877
878
impl SchedulerStore for RedisStore {
879
    type SubscriptionManager = RedisSubscriptionManager;
880
881
3
    fn subscription_manager(&self) -> Result<Arc<RedisSubscriptionManager>, Error> {
882
3
        let mut subscription_manager = self.subscription_manager.lock();
883
3
        match &*subscription_manager {
884
2
            Some(subscription_manager) => Ok(subscription_manager.clone()),
885
            None => {
886
1
                let Some(pub_sub_channel) = &self.pub_sub_channel else {
  Branch (886:21): [True: 1, False: 0]
  Branch (886:21): [Folded - Ignored]
887
0
                    return Err(make_input_err!("RedisStore must have a pubsub channel for a Redis Scheduler if using subscriptions"));
888
                };
889
1
                let sub = Arc::new(RedisSubscriptionManager::new(
890
1
                    self.subscriber_client.clone(),
891
1
                    pub_sub_channel.clone(),
892
1
                ));
893
1
                *subscription_manager = Some(sub.clone());
894
1
                Ok(sub)
895
            }
896
        }
897
3
    }
898
899
3
    async fn update_data<T>(&self, data: T) -> Result<Option<u64>, Error>
900
3
    where
901
3
        T: SchedulerStoreDataProvider
902
3
            + SchedulerStoreKeyProvider
903
3
            + SchedulerCurrentVersionProvider
904
3
            + Send,
905
3
    {
906
3
        let key = data.get_key();
907
3
        let key = self.encode_key(&key);
908
3
        let client = self.client_pool.next();
909
3
        let maybe_index = data.get_indexes().err_tip(|| {
910
0
            format!("Err getting index in RedisStore::update_data::versioned for {key:?}")
911
3
        })
?0
;
912
3
        if <T as SchedulerStoreKeyProvider>::Versioned::VALUE {
  Branch (912:12): [True: 0, False: 0]
  Branch (912:12): [True: 0, False: 0]
  Branch (912:12): [Folded - Ignored]
  Branch (912:12): [Folded - Ignored]
  Branch (912:12): [True: 0, False: 1]
  Branch (912:12): [True: 2, False: 0]
913
2
            let current_version = data.current_version();
914
2
            let data = data.try_into_bytes().err_tip(|| {
915
0
                format!("Could not convert value to bytes in RedisStore::update_data::versioned for {key:?}")
916
2
            })
?0
;
917
2
            let mut argv = Vec::with_capacity(3 + maybe_index.len() * 2);
918
2
            argv.push(Bytes::from(format!("{current_version}")));
919
2
            argv.push(data);
920
6
            for (
name, value4
) in maybe_index {
921
4
                argv.push(Bytes::from_static(name.as_bytes()));
922
4
                argv.push(value);
923
4
            }
924
2
            let new_version = self
925
2
                .update_if_version_matches_script
926
2
                .evalsha_with_reload::<u64, _, Vec<Bytes>>(client, vec![key.as_ref()], argv)
927
2
                .await
928
2
                .err_tip(|| 
format!("In RedisStore::update_data::versioned for {key:?}")0
)
?0
;
929
2
            if new_version == 0 {
  Branch (929:16): [True: 0, False: 0]
  Branch (929:16): [True: 0, False: 0]
  Branch (929:16): [Folded - Ignored]
  Branch (929:16): [Folded - Ignored]
  Branch (929:16): [True: 0, False: 0]
  Branch (929:16): [True: 0, False: 2]
930
0
                return Ok(None);
931
2
            }
932
            // If we have a publish channel configured, send a notice that the key has been set.
933
2
            if let Some(pub_sub_channel) = &self.pub_sub_channel {
  Branch (933:20): [True: 0, False: 0]
  Branch (933:20): [True: 0, False: 0]
  Branch (933:20): [Folded - Ignored]
  Branch (933:20): [Folded - Ignored]
  Branch (933:20): [True: 0, False: 0]
  Branch (933:20): [True: 2, False: 0]
934
2
                return Ok(client.publish(pub_sub_channel, key.as_ref()).await
?0
);
935
0
            };
936
0
            Ok(Some(new_version))
937
        } else {
938
1
            let data = data.try_into_bytes().err_tip(|| {
939
0
                format!("Could not convert value to bytes in RedisStore::update_data::noversion for {key:?}")
940
1
            })
?0
;
941
1
            let mut fields = RedisMap::new();
942
1
            fields.reserve(1 + maybe_index.len());
943
1
            fields.insert(DATA_FIELD_NAME.into(), data.into());
944
1
            for (
name, value0
) in maybe_index {
945
0
                fields.insert(name.into(), value.into());
946
0
            }
947
1
            client
948
1
                .hset::<(), _, _>(key.as_ref(), fields)
949
1
                .await
950
1
                .err_tip(|| 
format!("In RedisStore::update_data::noversion for {key:?}")0
)
?0
;
951
            // If we have a publish channel configured, send a notice that the key has been set.
952
1
            if let Some(pub_sub_channel) = &self.pub_sub_channel {
  Branch (952:20): [True: 0, False: 0]
  Branch (952:20): [True: 0, False: 0]
  Branch (952:20): [Folded - Ignored]
  Branch (952:20): [Folded - Ignored]
  Branch (952:20): [True: 1, False: 0]
  Branch (952:20): [True: 0, False: 0]
953
1
                return Ok(client.publish(pub_sub_channel, key.as_ref()).await
?0
);
954
0
            };
955
0
            Ok(Some(0)) // Always use "0" version since this is not a versioned request.
956
        }
957
3
    }
958
959
1
    async fn search_by_index_prefix<K>(
960
1
        &self,
961
1
        index: K,
962
1
    ) -> Result<
963
1
        impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
964
1
        Error,
965
1
    >
966
1
    where
967
1
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,
968
1
    {
969
1
        let index_value_prefix = index.index_value_prefix();
970
2
        let run_ft_aggregate = || {
971
2
            let client = self.client_pool.next().clone();
972
2
            let sanitized_field = try_sanitize(index_value_prefix.as_ref()).err_tip(|| {
973
0
                format!(
974
0
                    "In RedisStore::search_by_index_prefix::try_sanitize - {index_value_prefix:?}"
975
0
                )
976
2
            })
?0
;
977
2
            Ok::<_, Error>(async move {
978
2
                ft_aggregate(
979
2
                    client,
980
2
                    format!("{}", get_index_name!(K::KEY_PREFIX, K::INDEX_NAME)),
981
2
                    format!("@{}:{{ {}* }}", K::INDEX_NAME, sanitized_field),
982
2
                    fred::types::FtAggregateOptions {
983
2
                        load: Some(fred::types::Load::Some(vec![
984
2
                            fred::types::SearchField {
985
2
                                identifier: DATA_FIELD_NAME.into(),
986
2
                                property: None,
987
2
                            },
988
2
                            fred::types::SearchField {
989
2
                                identifier: VERSION_FIELD_NAME.into(),
990
2
                                property: None,
991
2
                            },
992
2
                        ])),
993
2
                        cursor: Some(fred::types::WithCursor {
994
2
                            count: Some(MAX_COUNT_PER_CURSOR),
995
2
                            max_idle: Some(CURSOR_IDLE_MS),
996
2
                        }),
997
2
                        pipeline: vec![fred::types::AggregateOperation::SortBy {
998
2
                            properties: vec![(
999
2
                                format!("@{}", K::INDEX_NAME).into(),
1000
2
                                fred::types::SortOrder::Asc,
1001
2
                            )],
1002
2
                            max: None,
1003
2
                        }],
1004
2
                        ..Default::default()
1005
2
                    },
1006
2
                )
1007
2
                .await
1008
2
            })
1009
2
        };
1010
1
        let stream = if let Ok(
stream0
) = run_ft_aggregate()
?0
.await {
  Branch (1010:29): [True: 0, False: 0]
  Branch (1010:29): [True: 0, False: 0]
  Branch (1010:29): [Folded - Ignored]
  Branch (1010:29): [Folded - Ignored]
  Branch (1010:29): [True: 0, False: 1]
1011
0
            stream
1012
        } else {
1013
1
            let create_result = self
1014
1
                .client_pool
1015
1
                .next()
1016
1
                .ft_create::<(), _>(
1017
1
                    format!("{}", get_index_name!(K::KEY_PREFIX, K::INDEX_NAME)),
1018
1
                    FtCreateOptions {
1019
1
                        on: Some(fred::types::IndexKind::Hash),
1020
1
                        prefixes: vec![K::KEY_PREFIX.into()],
1021
1
                        nohl: true,
1022
1
                        nofields: true,
1023
1
                        nofreqs: true,
1024
1
                        nooffsets: true,
1025
1
                        temporary: Some(INDEX_TTL_S),
1026
1
                        ..Default::default()
1027
1
                    },
1028
1
                    vec![SearchSchema {
1029
1
                        field_name: K::INDEX_NAME.into(),
1030
1
                        alias: None,
1031
1
                        kind: SearchSchemaKind::Tag {
1032
1
                            sortable: true,
1033
1
                            unf: false,
1034
1
                            separator: None,
1035
1
                            casesensitive: false,
1036
1
                            withsuffixtrie: false,
1037
1
                            noindex: false,
1038
1
                        },
1039
1
                    }],
1040
1
                )
1041
1
                .await
1042
1
                .err_tip(|| {
1043
0
                    format!(
1044
0
                        "Error with ft_create in RedisStore::search_by_index_prefix({})",
1045
0
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME),
1046
0
                    )
1047
1
                });
1048
1
            let run_result = run_ft_aggregate()
?0
.await.err_tip(|| {
1049
0
                format!(
1050
0
                    "Error with second ft_aggregate in RedisStore::search_by_index_prefix({})",
1051
0
                    get_index_name!(K::KEY_PREFIX, K::INDEX_NAME),
1052
0
                )
1053
1
            });
1054
1
            // Creating the index will race which is ok. If it fails to create, we only
1055
1
            // error if the second ft_aggregate call fails and fails to create.
1056
1
            match run_result {
1057
1
                Ok(stream) => stream,
1058
0
                Err(e) => return create_result.merge(Err(e)),
1059
            }
1060
        };
1061
1
        Ok(stream.map(|result| 
{0
1062
0
            let mut redis_map =
1063
0
                result.err_tip(|| "Error in stream of in RedisStore::search_by_index_prefix")?;
1064
0
            let bytes_data = redis_map
1065
0
                .remove(&RedisKey::from_static_str(DATA_FIELD_NAME))
1066
0
                .err_tip(|| "Missing data field in RedisStore::search_by_index_prefix")?
1067
0
                .into_bytes()
1068
0
                .err_tip(|| {
1069
0
                    formatcp!("'{DATA_FIELD_NAME}' is not Bytes in RedisStore::search_by_index_prefix::into_bytes")
1070
0
                })?;
1071
0
            let version = if <K as SchedulerIndexProvider>::Versioned::VALUE {
  Branch (1071:30): [True: 0, False: 0]
  Branch (1071:30): [True: 0, False: 0]
  Branch (1071:30): [Folded - Ignored]
  Branch (1071:30): [Folded - Ignored]
  Branch (1071:30): [True: 0, False: 0]
1072
0
                redis_map
1073
0
                    .remove(&RedisKey::from_static_str(VERSION_FIELD_NAME))
1074
0
                    .err_tip(|| "Missing version field in RedisStore::search_by_index_prefix")?
1075
0
                    .as_u64()
1076
0
                    .err_tip(|| {
1077
0
                        formatcp!("'{VERSION_FIELD_NAME}' is not u64 in RedisStore::search_by_index_prefix::as_u64")
1078
0
                    })?
1079
            } else {
1080
0
                0
1081
            };
1082
0
            K::decode(version, bytes_data)
1083
0
                .err_tip(|| "In RedisStore::search_by_index_prefix::decode")
1084
1
        
}0
))
1085
1
    }
1086
1087
2
    async fn get_and_decode<K>(
1088
2
        &self,
1089
2
        key: K,
1090
2
    ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>
1091
2
    where
1092
2
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send,
1093
2
    {
1094
2
        let key = key.get_key();
1095
2
        let key = self.encode_key(&key);
1096
2
        let client = self.client_pool.next();
1097
2
        let (maybe_version, maybe_data) = client
1098
2
            .hmget::<(Option<u64>, Option<Bytes>), _, _>(
1099
2
                key.as_ref(),
1100
2
                vec![
1101
2
                    RedisKey::from(VERSION_FIELD_NAME),
1102
2
                    RedisKey::from(DATA_FIELD_NAME),
1103
2
                ],
1104
2
            )
1105
2
            .await
1106
2
            .err_tip(|| 
format!("In RedisStore::get_without_version::notversioned {key}")0
)
?0
;
1107
2
        let Some(data) = maybe_data else {
  Branch (1107:13): [True: 0, False: 0]
  Branch (1107:13): [True: 0, False: 0]
  Branch (1107:13): [Folded - Ignored]
  Branch (1107:13): [Folded - Ignored]
  Branch (1107:13): [True: 2, False: 0]
1108
0
            return Ok(None);
1109
        };
1110
2
        Ok(Some(K::decode(maybe_version.unwrap_or(0), data).err_tip(
1111
2
            || 
format!("In RedisStore::get_with_version::notversioned::decode {key}")0
,
1112
2
        )
?0
))
1113
2
    }
1114
}