Coverage Report

Created: 2025-04-19 16:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/redis_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::cmp;
17
use std::pin::Pin;
18
use std::sync::{Arc, Weak};
19
use std::time::Duration;
20
21
use async_trait::async_trait;
22
use bytes::Bytes;
23
use const_format::formatcp;
24
use fred::clients::{Pool as RedisPool, SubscriberClient};
25
use fred::interfaces::{ClientLike, KeysInterface, PubsubInterface};
26
use fred::prelude::{EventInterface, HashesInterface, RediSearchInterface};
27
use fred::types::config::{
28
    Config as RedisConfig, ConnectionConfig, PerformanceConfig, ReconnectPolicy, UnresponsiveConfig,
29
};
30
use fred::types::redisearch::{
31
    AggregateOperation, FtAggregateOptions, FtCreateOptions, IndexKind, Load, SearchField,
32
    SearchSchema, SearchSchemaKind, WithCursor,
33
};
34
use fred::types::scripts::Script;
35
use fred::types::{Builder, Key as RedisKey, Map as RedisMap, SortOrder, Value as RedisValue};
36
use futures::stream::FuturesUnordered;
37
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future};
38
use nativelink_config::stores::{RedisMode, RedisSpec};
39
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
40
use nativelink_metric::MetricsComponent;
41
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
42
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
43
use nativelink_util::spawn;
44
use nativelink_util::store_trait::{
45
    BoolValue, SchedulerCurrentVersionProvider, SchedulerIndexProvider, SchedulerStore,
46
    SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,
47
    SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo,
48
};
49
use nativelink_util::task::JoinHandleDropGuard;
50
use parking_lot::{Mutex, RwLock};
51
use patricia_tree::StringPatriciaMap;
52
use tokio::select;
53
use tokio::time::sleep;
54
use tracing::{Level, event};
55
use uuid::Uuid;
56
57
use crate::cas_utils::is_zero_digest;
58
use crate::redis_utils::ft_aggregate;
59
60
/// The default size of the read chunk when reading data from Redis.
61
/// Note: If this changes it should be updated in the config documentation.
62
const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024;
63
64
/// The default size of the connection pool if not specified.
65
/// Note: If this changes it should be updated in the config documentation.
66
const DEFAULT_CONNECTION_POOL_SIZE: usize = 3;
67
68
/// The default delay between retries if not specified.
69
/// Note: If this changes it should be updated in the config documentation.
70
const DEFAULT_RETRY_DELAY: f32 = 0.1;
71
/// The amount of jitter to add to the retry delay if not specified.
72
/// Note: If this changes it should be updated in the config documentation.
73
const DEFAULT_RETRY_JITTER: f32 = 0.5;
74
75
/// The default maximum capacity of the broadcast channel if not specified.
76
/// Note: If this changes it should be updated in the config documentation.
77
const DEFAULT_BROADCAST_CHANNEL_CAPACITY: usize = 4096;
78
79
/// The default connection timeout in milliseconds if not specified.
80
/// Note: If this changes it should be updated in the config documentation.
81
const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000;
82
83
/// The default command timeout in milliseconds if not specified.
84
/// Note: If this changes it should be updated in the config documentation.
85
const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000;
86
87
/// The default maximum number of chunk uploads per update.
88
/// Note: If this changes it should be updated in the config documentation.
89
const DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE: usize = 10;
90
91
#[expect(
92
    clippy::trivially_copy_pass_by_ref,
93
    reason = "must match method signature expected"
94
)]
95
1
fn to_hex(value: &u32) -> String {
96
1
    format!("{value:08x}")
97
1
}
98
99
/// A [`StoreDriver`] implementation that uses Redis as a backing store.
100
#[derive(Debug, MetricsComponent)]
101
pub struct RedisStore {
102
    /// The client pool connecting to the backing Redis instance(s).
103
    client_pool: RedisPool,
104
105
    /// A channel to publish updates to when a key is added, removed, or modified.
106
    #[metric(
107
        help = "The pubsub channel to publish updates to when a key is added, removed, or modified"
108
    )]
109
    pub_sub_channel: Option<String>,
110
111
    /// A redis client for managing subscriptions.
112
    /// TODO: This should be moved into the store in followups once a standard use pattern has been determined.
113
    subscriber_client: SubscriberClient,
114
115
    /// For metrics only.
116
    #[metric(
117
        help = "A unique identifier for the FT.CREATE command used to create the index template",
118
        handler = to_hex
119
    )]
120
    fingerprint_create_index: u32,
121
122
    /// A function used to generate names for temporary keys.
123
    temp_name_generator_fn: fn() -> String,
124
125
    /// A common prefix to append to all keys before they are sent to Redis.
126
    ///
127
    /// See [`RedisStore::key_prefix`](`nativelink_config::stores::RedisStore::key_prefix`).
128
    #[metric(help = "Prefix to append to all keys before sending to Redis")]
129
    key_prefix: String,
130
131
    /// The amount of data to read from Redis at a time.
132
    #[metric(help = "The amount of data to read from Redis at a time")]
133
    read_chunk_size: usize,
134
135
    /// The maximum number of chunk uploads per update.
136
    /// This is used to limit the number of chunk uploads per update to prevent
137
    #[metric(help = "The maximum number of chunk uploads per update")]
138
    max_chunk_uploads_per_update: usize,
139
140
    /// Redis script used to update a value in redis if the version matches.
141
    /// This is done by incrementing the version number and then setting the new data
142
    /// only if the version number matches the existing version number.
143
    update_if_version_matches_script: Script,
144
145
    /// A manager for subscriptions to keys in Redis.
146
    subscription_manager: Mutex<Option<Arc<RedisSubscriptionManager>>>,
147
}
148
149
impl RedisStore {
150
    /// Create a new `RedisStore` from the given configuration.
151
0
    pub fn new(mut spec: RedisSpec) -> Result<Arc<Self>, Error> {
152
0
        if spec.addresses.is_empty() {
  Branch (152:12): [True: 0, False: 0]
  Branch (152:12): [Folded - Ignored]
153
0
            return Err(make_err!(
154
0
                Code::InvalidArgument,
155
0
                "No addresses were specified in redis store configuration."
156
0
            ));
157
0
        };
158
0
        let [addr] = spec.addresses.as_slice() else {
  Branch (158:13): [True: 0, False: 0]
  Branch (158:13): [Folded - Ignored]
159
0
            return Err(make_err!(
160
0
                Code::Unimplemented,
161
0
                "Connecting directly to multiple redis nodes in a cluster is currently unsupported. Please specify a single URL to a single node, and nativelink will use cluster discover to find the other nodes."
162
0
            ));
163
        };
164
0
        let redis_config = match spec.mode {
165
0
            RedisMode::Cluster => RedisConfig::from_url_clustered(addr),
166
0
            RedisMode::Sentinel => RedisConfig::from_url_sentinel(addr),
167
0
            RedisMode::Standard => RedisConfig::from_url_centralized(addr),
168
        }
169
0
        .err_tip_with_code(|e| {
170
0
            (
171
0
                Code::InvalidArgument,
172
0
                format!("while parsing redis node address: {e}"),
173
0
            )
174
0
        })?;
175
176
0
        let reconnect_policy = {
177
0
            if spec.retry.delay == 0.0 {
  Branch (177:16): [True: 0, False: 0]
  Branch (177:16): [Folded - Ignored]
178
0
                spec.retry.delay = DEFAULT_RETRY_DELAY;
179
0
            }
180
0
            if spec.retry.jitter == 0.0 {
  Branch (180:16): [True: 0, False: 0]
  Branch (180:16): [Folded - Ignored]
181
0
                spec.retry.jitter = DEFAULT_RETRY_JITTER;
182
0
            }
183
184
0
            let max_retries = u32::try_from(spec.retry.max_retries)
185
0
                .err_tip(|| "max_retries could not be converted to u32 in RedisStore::new")?;
186
0
            let min_delay_ms = (spec.retry.delay * 1000.0) as u32;
187
0
            let max_delay_ms = 8000;
188
0
            let jitter = (spec.retry.jitter * spec.retry.delay * 1000.0) as u32;
189
0
190
0
            let mut reconnect_policy = ReconnectPolicy::new_exponential(
191
0
                max_retries,  /* max_retries, 0 is unlimited */
192
0
                min_delay_ms, /* min_delay */
193
0
                max_delay_ms, /* max_delay */
194
0
                2,            /* mult */
195
0
            );
196
0
            reconnect_policy.set_jitter(jitter);
197
0
            reconnect_policy
198
0
        };
199
0
200
0
        {
201
0
            if spec.broadcast_channel_capacity == 0 {
  Branch (201:16): [True: 0, False: 0]
  Branch (201:16): [Folded - Ignored]
202
0
                spec.broadcast_channel_capacity = DEFAULT_BROADCAST_CHANNEL_CAPACITY;
203
0
            }
204
0
            if spec.connection_timeout_ms == 0 {
  Branch (204:16): [True: 0, False: 0]
  Branch (204:16): [Folded - Ignored]
205
0
                spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS;
206
0
            }
207
0
            if spec.command_timeout_ms == 0 {
  Branch (207:16): [True: 0, False: 0]
  Branch (207:16): [Folded - Ignored]
208
0
                spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS;
209
0
            }
210
0
            if spec.connection_pool_size == 0 {
  Branch (210:16): [True: 0, False: 0]
  Branch (210:16): [Folded - Ignored]
211
0
                spec.connection_pool_size = DEFAULT_CONNECTION_POOL_SIZE;
212
0
            }
213
0
            if spec.read_chunk_size == 0 {
  Branch (213:16): [True: 0, False: 0]
  Branch (213:16): [Folded - Ignored]
214
0
                spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
215
0
            }
216
0
            if spec.max_chunk_uploads_per_update == 0 {
  Branch (216:16): [True: 0, False: 0]
  Branch (216:16): [Folded - Ignored]
217
0
                spec.max_chunk_uploads_per_update = DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE;
218
0
            }
219
        }
220
0
        let connection_timeout = Duration::from_millis(spec.connection_timeout_ms);
221
0
        let command_timeout = Duration::from_millis(spec.command_timeout_ms);
222
0
223
0
        let mut builder = Builder::from_config(redis_config);
224
0
        builder
225
0
            .set_performance_config(PerformanceConfig {
226
0
                default_command_timeout: command_timeout,
227
0
                broadcast_channel_capacity: spec.broadcast_channel_capacity,
228
0
                ..Default::default()
229
0
            })
230
0
            .set_connection_config(ConnectionConfig {
231
0
                connection_timeout,
232
0
                internal_command_timeout: command_timeout,
233
0
                unresponsive: UnresponsiveConfig {
234
0
                    max_timeout: Some(connection_timeout),
235
0
                    // This number needs to be less than the connection timeout.
236
0
                    // We use 4 as it is a good balance between not spamming the server
237
0
                    // and not waiting too long.
238
0
                    interval: connection_timeout / 4,
239
0
                },
240
0
                ..Default::default()
241
0
            })
242
0
            .set_policy(reconnect_policy);
243
244
0
        let client_pool = builder
245
0
            .build_pool(spec.connection_pool_size)
246
0
            .err_tip(|| "while creating redis connection pool")?;
247
248
0
        let subscriber_client = builder
249
0
            .build_subscriber_client()
250
0
            .err_tip(|| "while creating redis subscriber client")?;
251
252
0
        Self::new_from_builder_and_parts(
253
0
            client_pool,
254
0
            subscriber_client,
255
0
            spec.experimental_pub_sub_channel.clone(),
256
0
            || Uuid::new_v4().to_string(),
257
0
            spec.key_prefix.clone(),
258
0
            spec.read_chunk_size,
259
0
            spec.max_chunk_uploads_per_update,
260
0
        )
261
0
        .map(Arc::new)
262
0
    }
263
264
    /// Used for testing when determinism is required.
265
10
    pub fn new_from_builder_and_parts(
266
10
        client_pool: RedisPool,
267
10
        subscriber_client: SubscriberClient,
268
10
        pub_sub_channel: Option<String>,
269
10
        temp_name_generator_fn: fn() -> String,
270
10
        key_prefix: String,
271
10
        read_chunk_size: usize,
272
10
        max_chunk_uploads_per_update: usize,
273
10
    ) -> Result<Self, Error> {
274
10
        // Start connection pool (this will retry forever by default).
275
10
        client_pool.connect();
276
10
        subscriber_client.connect();
277
10
278
10
        Ok(Self {
279
10
            client_pool,
280
10
            pub_sub_channel,
281
10
            subscriber_client,
282
10
            fingerprint_create_index: fingerprint_create_index_template(),
283
10
            temp_name_generator_fn,
284
10
            key_prefix,
285
10
            read_chunk_size,
286
10
            max_chunk_uploads_per_update,
287
10
            update_if_version_matches_script: Script::from_lua(LUA_VERSION_SET_SCRIPT),
288
10
            subscription_manager: Mutex::new(None),
289
10
        })
290
10
    }
291
292
    /// Encode a [`StoreKey`] so it can be sent to Redis.
293
21
    fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> {
294
21
        let key_body = key.as_str();
295
21
        if self.key_prefix.is_empty() {
  Branch (295:12): [True: 17, False: 4]
  Branch (295:12): [Folded - Ignored]
296
17
            key_body
297
        } else {
298
            // This is in the hot path for all redis operations, so we try to reuse the allocation
299
            // from `key.as_str()` if possible.
300
4
            match key_body {
301
4
                Cow::Owned(mut encoded_key) => {
302
4
                    encoded_key.insert_str(0, &self.key_prefix);
303
4
                    Cow::Owned(encoded_key)
304
                }
305
0
                Cow::Borrowed(body) => {
306
0
                    let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len());
307
0
                    encoded_key.push_str(&self.key_prefix);
308
0
                    encoded_key.push_str(body);
309
0
                    Cow::Owned(encoded_key)
310
                }
311
            }
312
        }
313
21
    }
314
}
315
316
#[async_trait]
317
impl StoreDriver for RedisStore {
318
    async fn has_with_results(
319
        self: Pin<&Self>,
320
        keys: &[StoreKey<'_>],
321
        results: &mut [Option<u64>],
322
12
    ) -> Result<(), Error> {
323
        // TODO(allada) We could use pipeline here, but it makes retry more
324
        // difficult and it doesn't work very well in cluster mode.
325
        // If we wanted to optimize this with pipeline be careful to
326
        // implement retry and to support cluster mode.
327
6
        let client = self.client_pool.next();
328
6
        keys.iter()
329
6
            .zip(results.iter_mut())
330
6
            .map(|(key, result)| async move {
331
6
                // We need to do a special pass to ensure our zero key exist.
332
6
                if is_zero_digest(key.borrow()) {
  Branch (332:20): [True: 2, False: 4]
  Branch (332:20): [Folded - Ignored]
333
2
                    *result = Some(0);
334
2
                    return Ok::<_, Error>(());
335
4
                }
336
4
                let encoded_key = self.encode_key(key);
337
4
                let pipeline = client.pipeline();
338
4
                pipeline
339
4
                    .strlen::<(), _>(encoded_key.as_ref())
340
4
                    .await
341
4
                    .err_tip(|| {
342
0
                        format!("In RedisStore::has_with_results::strlen for {encoded_key}")
343
0
                    })?;
344
                // Redis returns 0 when the key doesn't exist
345
                // AND when the key exists with value of length 0.
346
                // Therefore, we need to check both length and existence
347
                // and do it in a pipeline for efficiency.
348
4
                pipeline
349
4
                    .exists::<(), _>(encoded_key.as_ref())
350
4
                    .await
351
4
                    .err_tip(|| {
352
0
                        format!("In RedisStore::has_with_results::exists for {encoded_key}")
353
0
                    })?;
354
4
                let (blob_len, exists) = pipeline
355
4
                    .all::<(u64, bool)>()
356
4
                    .await
357
4
                    .err_tip(|| 
"In RedisStore::has_with_results::query"0
)
?0
;
358
359
4
                *result = if exists { Some(blob_len) } else { 
None0
};
  Branch (359:30): [True: 4, False: 0]
  Branch (359:30): [Folded - Ignored]
360
361
4
                Ok::<_, Error>(())
362
12
            })
363
6
            .collect::<FuturesUnordered<_>>()
364
6
            .try_collect()
365
6
            .await
366
12
    }
367
368
    async fn update(
369
        self: Pin<&Self>,
370
        key: StoreKey<'_>,
371
        mut reader: DropCloserReadHalf,
372
        _upload_size: UploadSizeInfo,
373
14
    ) -> Result<(), Error> {
374
7
        let final_key = self.encode_key(&key);
375
7
376
7
        // While the name generation function can be supplied by the user, we need to have the curly
377
7
        // braces in place in order to manage redis' hashing behavior and make sure that the temporary
378
7
        // key name and the final key name are directed to the same cluster node. See
379
7
        // https://redis.io/blog/redis-clustering-best-practices-with-keys/
380
7
        //
381
7
        // The TL;DR is that if we're in cluster mode and the names hash differently, we can't use request
382
7
        // pipelining. By using these braces, we tell redis to only hash the part of the temporary key that's
383
7
        // identical to the final key -- so they will always hash to the same node.
384
7
        let temp_key = format!(
385
7
            "temp-{}-{{{}}}",
386
7
            (self.temp_name_generator_fn)(),
387
7
            &final_key
388
7
        );
389
7
390
7
        if is_zero_digest(key.borrow()) {
  Branch (390:12): [True: 2, False: 5]
  Branch (390:12): [Folded - Ignored]
391
2
            let chunk = reader
392
2
                .peek()
393
2
                .await
394
2
                .err_tip(|| 
"Failed to peek in RedisStore::update"0
)
?0
;
395
2
            if chunk.is_empty() {
  Branch (395:16): [True: 2, False: 0]
  Branch (395:16): [Folded - Ignored]
396
2
                reader
397
2
                    .drain()
398
2
                    .await
399
2
                    .err_tip(|| 
"Failed to drain in RedisStore::update"0
)
?0
;
400
                // Zero-digest keys are special -- we don't need to do anything with it.
401
2
                return Ok(());
402
0
            }
403
5
        }
404
405
5
        let client = self.client_pool.next();
406
5
407
5
        let mut read_stream = reader
408
6
            .scan(0u32, |bytes_read, chunk_res| {
409
6
                future::ready(Some(
410
6
                    chunk_res
411
6
                        .err_tip(|| 
"Failed to read chunk in update in redis store"1
)
412
6
                        .and_then(|chunk| {
413
5
                            let offset = *bytes_read;
414
5
                            let chunk_len = u32::try_from(chunk.len()).err_tip(
415
0
                                || "Could not convert chunk length to u32 in RedisStore::update",
416
0
                            )?;
417
5
                            let new_bytes_read = bytes_read
418
5
                                .checked_add(chunk_len)
419
5
                                .err_tip(|| 
"Overflow protection in RedisStore::update"0
)
?0
;
420
5
                            *bytes_read = new_bytes_read;
421
5
                            Ok::<_, Error>((offset, *bytes_read, chunk))
422
5
                        }),
423
                ))
424
6
            })
425
6
            .
map5
(|res| {
426
6
                let (
offset, end_pos, chunk5
) = res
?1
;
427
5
                let temp_key_ref = &temp_key;
428
5
                Ok(async move {
429
5
                    client
430
5
                        .setrange::<(), _, _>(temp_key_ref, offset, chunk)
431
5
                        .await
432
5
                        .err_tip(
433
0
                            || "While appending to append to temp key in RedisStore::update",
434
0
                        )?;
435
5
                    Ok::<u32, Error>(end_pos)
436
5
                })
437
6
            })
438
5
            .try_buffer_unordered(self.max_chunk_uploads_per_update);
439
5
440
5
        let mut total_len: u32 = 0;
441
10
        while let Some(
last_pos5
) = read_stream.try_next().await
?1
{
  Branch (441:19): [True: 5, False: 4]
  Branch (441:19): [Folded - Ignored]
442
5
            if last_pos > total_len {
  Branch (442:16): [True: 5, False: 0]
  Branch (442:16): [Folded - Ignored]
443
5
                total_len = last_pos;
444
5
            
}0
445
        }
446
447
4
        let blob_len = client
448
4
            .strlen::<u64, _>(&temp_key)
449
4
            .await
450
4
            .err_tip(|| 
format!("In RedisStore::update strlen check for {temp_key}")0
)
?0
;
451
        // This is a safety check to ensure that in the event some kind of retry was to happen
452
        // and the data was appended to the key twice, we reject the data.
453
4
        if blob_len != u64::from(total_len) {
  Branch (453:12): [True: 0, False: 4]
  Branch (453:12): [Folded - Ignored]
454
0
            return Err(make_input_err!(
455
0
                "Data length mismatch in RedisStore::update for {}({}) - expected {} bytes, got {} bytes",
456
0
                key.borrow().as_str(),
457
0
                temp_key,
458
0
                total_len,
459
0
                blob_len,
460
0
            ));
461
4
        }
462
4
463
4
        // Rename the temp key so that the data appears under the real key. Any data already present in the real key is lost.
464
4
        client
465
4
            .rename::<(), _, _>(&temp_key, final_key.as_ref())
466
4
            .await
467
4
            .err_tip(|| 
"While queueing key rename in RedisStore::update()"0
)
?0
;
468
469
        // If we have a publish channel configured, send a notice that the key has been set.
470
4
        if let Some(
pub_sub_channel0
) = &self.pub_sub_channel {
  Branch (470:16): [True: 0, False: 4]
  Branch (470:16): [Folded - Ignored]
471
0
            return Ok(client.publish(pub_sub_channel, final_key.as_ref()).await?);
472
4
        };
473
4
474
4
        Ok(())
475
14
    }
476
477
    async fn get_part(
478
        self: Pin<&Self>,
479
        key: StoreKey<'_>,
480
        writer: &mut DropCloserWriteHalf,
481
        offset: u64,
482
        length: Option<u64>,
483
10
    ) -> Result<(), Error> {
484
5
        let offset = usize::try_from(offset).err_tip(|| 
"Could not convert offset to usize"0
)
?0
;
485
5
        let length = length
486
5
            .map(|v| 
usize::try_from(v).err_tip4
(||
"Could not convert length to usize"0
))
487
5
            .transpose()
?0
;
488
489
        // To follow RBE spec we need to consider any digest's with
490
        // zero size to be existing.
491
5
        if is_zero_digest(key.borrow()) {
  Branch (491:12): [True: 0, False: 5]
  Branch (491:12): [Folded - Ignored]
492
0
            return writer
493
0
                .send_eof()
494
0
                .err_tip(|| "Failed to send zero EOF in redis store get_part");
495
5
        }
496
5
497
5
        let client = self.client_pool.next();
498
5
        let encoded_key = self.encode_key(&key);
499
5
        let encoded_key = encoded_key.as_ref();
500
5
501
5
        // N.B. the `-1`'s you see here are because redis GETRANGE is inclusive at both the start and end, so when we
502
5
        // do math with indices we change them to be exclusive at the end.
503
5
504
5
        // We want to read the data at the key from `offset` to `offset + length`.
505
5
        let data_start = offset;
506
5
        let data_end = data_start
507
5
            .saturating_add(length.unwrap_or(isize::MAX as usize))
508
5
            .saturating_sub(1);
509
5
510
5
        // And we don't ever want to read more than `read_chunk_size` bytes at a time, so we'll need to iterate.
511
5
        let mut chunk_start = data_start;
512
5
        let mut chunk_end = cmp::min(
513
5
            data_start.saturating_add(self.read_chunk_size) - 1,
514
5
            data_end,
515
        );
516
517
        loop {
518
8
            let chunk: Bytes = client
519
8
                .getrange(encoded_key, chunk_start, chunk_end)
520
8
                .await
521
8
                .err_tip(|| 
"In RedisStore::get_part::getrange"0
)
?0
;
522
523
8
            let didnt_receive_full_chunk = chunk.len() < self.read_chunk_size;
524
8
            let reached_end_of_data = chunk_end == data_end;
525
8
526
8
            if didnt_receive_full_chunk || 
reached_end_of_data4
{
  Branch (526:16): [True: 4, False: 4]
  Branch (526:44): [True: 1, False: 3]
  Branch (526:16): [Folded - Ignored]
  Branch (526:44): [Folded - Ignored]
527
5
                if !chunk.is_empty() {
  Branch (527:20): [True: 4, False: 1]
  Branch (527:20): [Folded - Ignored]
528
4
                    writer
529
4
                        .send(chunk)
530
4
                        .await
531
4
                        .err_tip(|| 
"Failed to write data in RedisStore::get_part"0
)
?0
;
532
1
                }
533
534
5
                break; // No more data to read.
535
3
            }
536
3
537
3
            // We received a full chunk's worth of data, so write it...
538
3
            writer
539
3
                .send(chunk)
540
3
                .await
541
3
                .err_tip(|| 
"Failed to write data in RedisStore::get_part"0
)
?0
;
542
543
            // ...and go grab the next chunk.
544
3
            chunk_start = chunk_end + 1;
545
3
            chunk_end = cmp::min(
546
3
                chunk_start.saturating_add(self.read_chunk_size) - 1,
547
3
                data_end,
548
3
            );
549
        }
550
551
        // If we didn't write any data, check if the key exists, if not return a NotFound error.
552
        // This is required by spec.
553
5
        if writer.get_bytes_written() == 0 {
  Branch (553:12): [True: 1, False: 4]
  Branch (553:12): [Folded - Ignored]
554
            // We're supposed to read 0 bytes, so just check if the key exists.
555
1
            let exists = client
556
1
                .exists::<bool, _>(encoded_key)
557
1
                .await
558
1
                .err_tip(|| 
"In RedisStore::get_part::zero_exists"0
)
?0
;
559
560
1
            if !exists {
  Branch (560:16): [True: 1, False: 0]
  Branch (560:16): [Folded - Ignored]
561
1
                return Err(make_err!(
562
1
                    Code::NotFound,
563
1
                    "Data not found in Redis store for digest: {key:?}"
564
1
                ));
565
0
            }
566
4
        }
567
568
4
        writer
569
4
            .send_eof()
570
4
            .err_tip(|| 
"Failed to write EOF in redis store get_part"0
)
571
10
    }
572
573
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
574
0
        self
575
0
    }
576
577
0
    fn as_any(&self) -> &(dyn std::any::Any + Sync + Send) {
578
0
        self
579
0
    }
580
581
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send> {
582
0
        self
583
0
    }
584
585
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
586
0
        registry.register_indicator(self);
587
0
    }
588
}
589
590
#[async_trait]
591
impl HealthStatusIndicator for RedisStore {
592
0
    fn get_name(&self) -> &'static str {
593
0
        "RedisStore"
594
0
    }
595
596
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
597
0
        StoreDriver::check_health(Pin::new(self), namespace).await
598
0
    }
599
}
600
601
// -------------------------------------------------------------------
602
// Below this line are specific to the redis scheduler implementation.
603
// -------------------------------------------------------------------
604
605
/// The maximum number of results to return per cursor.
606
const MAX_COUNT_PER_CURSOR: u64 = 256;
607
/// The time in milliseconds that a redis cursor can be idle before it is closed.
608
const CURSOR_IDLE_MS: u64 = 2_000;
609
/// The name of the field in the Redis hash that stores the data.
610
const DATA_FIELD_NAME: &str = "data";
611
/// The name of the field in the Redis hash that stores the version.
612
const VERSION_FIELD_NAME: &str = "version";
613
/// The time to live of indexes in seconds. After this time redis may delete the index.
614
const INDEX_TTL_S: u64 = 60 * 60 * 24; // 24 hours.
615
616
/// String of the `FT.CREATE` command used to create the index template. It is done this
617
/// way so we can use it in both const (compile time) functions and runtime functions.
618
/// This is a macro because we need to use it in multiple places that sometimes require the
619
/// data as different data types (specifically for rust's `format_args!` macro).
620
macro_rules! get_create_index_template {
621
    () => {
622
        "FT.CREATE {} ON HASH PREFIX 1 {} NOOFFSETS NOHL NOFIELDS NOFREQS SCHEMA {} TAG CASESENSITIVE SORTABLE"
623
    }
624
}
625
626
/// Lua script to set a key if the version matches.
627
/// Args:
628
///   KEYS[1]: The key where the version is stored.
629
///   ARGV[1]: The expected version.
630
///   ARGV[2]: The new data.
631
///   ARGV[3*]: Key-value pairs of additional data to include.
632
/// Returns:
633
///   The new version if the version matches. nil is returned if the
634
///   value was not set.
635
const LUA_VERSION_SET_SCRIPT: &str = formatcp!(
636
    r"
637
local key = KEYS[1]
638
local expected_version = tonumber(ARGV[1])
639
local new_data = ARGV[2]
640
local new_version = redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', 1)
641
local i
642
local indexes = {{}}
643
644
if new_version-1 ~= expected_version then
645
    redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', -1)
646
    return 0
647
end
648
-- Skip first 2 argvs, as they are known inputs.
649
-- Remember: Lua is 1-indexed.
650
for i=3, #ARGV do
651
    indexes[i-2] = ARGV[i]
652
end
653
654
-- In testing we witnessed redis sometimes not update our FT indexes
655
-- resulting in stale data. It appears if we delete our keys then insert
656
-- them again it works and reduces risk significantly.
657
redis.call('DEL', key)
658
redis.call('HSET', key, '{DATA_FIELD_NAME}', new_data, '{VERSION_FIELD_NAME}', new_version, unpack(indexes))
659
660
return new_version
661
"
662
);
663
664
/// Compile-time fingerprint of the `FT.CREATE` command used to create the index template.
665
/// This is a simple CRC32 checksum of the command string. We don't care about it actually
666
/// being a valid CRC32 checksum, just that it's a unique identifier with a low chance of
667
/// collision.
668
13
const fn fingerprint_create_index_template() -> u32 {
669
    const POLY: u32 = 0xEDB8_8320;
670
    const DATA: &[u8] = get_create_index_template!().as_bytes();
671
13
    let mut crc = 0xFFFF_FFFF;
672
13
    let mut i = 0;
673
1.32k
    while i < DATA.len() {
  Branch (673:11): [True: 1.31k, False: 13]
  Branch (673:11): [Folded - Ignored]
674
1.31k
        let byte = DATA[i];
675
1.31k
        crc ^= byte as u32;
676
1.31k
677
1.31k
        let mut j = 0;
678
11.8k
        while j < 8 {
  Branch (678:15): [True: 10.5k, False: 1.31k]
  Branch (678:15): [Folded - Ignored]
679
10.5k
            crc = if crc & 1 != 0 {
  Branch (679:22): [True: 5.01k, False: 5.48k]
  Branch (679:22): [Folded - Ignored]
680
5.01k
                (crc >> 1) ^ POLY
681
            } else {
682
5.48k
                crc >> 1
683
            };
684
10.5k
            j += 1;
685
        }
686
1.31k
        i += 1;
687
    }
688
13
    crc
689
13
}
690
691
/// Get the name of the index to create for the given field.
692
/// This will add some prefix data to the name to try and ensure
693
/// if the index definition changes, the name will get a new name.
694
macro_rules! get_index_name {
695
    ($prefix:expr, $field:expr, $maybe_sort:expr) => {
696
        format_args!(
697
            "{}_{}_{}_{:08x}",
698
            $prefix,
699
            $field,
700
            $maybe_sort.unwrap_or(""),
701
            fingerprint_create_index_template(),
702
        )
703
    };
704
}
705
706
/// Try to sanitize a string to be used as a Redis key.
707
/// We don't actually modify the string, just check if it's valid.
708
2
const fn try_sanitize(s: &str) -> Option<&str> {
709
2
    // Note: We cannot use for loops or iterators here because they are not const.
710
2
    // Allowing us to use a const function here gives the compiler the ability to
711
2
    // optimize this function away entirely in the case where the input is constant.
712
2
    let chars = s.as_bytes();
713
2
    let mut i: usize = 0;
714
2
    let len = s.len();
715
    loop {
716
180
        if i >= len {
  Branch (716:12): [True: 2, False: 178]
  Branch (716:12): [Folded - Ignored]
717
2
            break;
718
178
        }
719
178
        let c = chars[i];
720
178
        if !c.is_ascii_alphanumeric() && 
c != b'_'10
{
  Branch (720:12): [True: 10, False: 168]
  Branch (720:42): [True: 0, False: 10]
  Branch (720:12): [Folded - Ignored]
  Branch (720:42): [Folded - Ignored]
721
0
            return None;
722
178
        }
723
178
        i += 1;
724
    }
725
2
    Some(s)
726
2
}
727
728
/// An individual subscription to a key in Redis.
729
#[derive(Debug)]
730
pub struct RedisSubscription {
731
    receiver: Option<tokio::sync::watch::Receiver<String>>,
732
    weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
733
}
734
735
impl SchedulerSubscription for RedisSubscription {
736
    /// Wait for the subscription key to change.
737
7
    async fn changed(&mut self) -> Result<(), Error> {
738
7
        let receiver = self
739
7
            .receiver
740
7
            .as_mut()
741
7
            .ok_or_else(|| 
make_err!(Code::Internal, "In RedisSubscription::changed::as_mut")0
)
?0
;
742
7
        receiver
743
7
            .changed()
744
7
            .await
745
6
            .map_err(|_| 
make_err!(Code::Internal, "In RedisSubscription::changed::changed")0
)
746
6
    }
747
}
748
749
// If the subscription is dropped, we need to possibly remove the key from the
750
// subscribed keys map.
751
impl Drop for RedisSubscription {
752
2
    fn drop(&mut self) {
753
2
        let Some(receiver) = self.receiver.take() else {
  Branch (753:13): [True: 2, False: 0]
  Branch (753:13): [Folded - Ignored]
754
0
            event!(
755
0
                Level::WARN,
756
0
                "RedisSubscription has already been dropped, nothing to do."
757
            );
758
0
            return; // Already dropped, nothing to do.
759
        };
760
2
        let key = receiver.borrow().clone();
761
2
        // IMPORTANT: This must be dropped before receiver_count() is called.
762
2
        drop(receiver);
763
2
        let Some(subscribed_keys) = self.weak_subscribed_keys.upgrade() else {
  Branch (763:13): [True: 2, False: 0]
  Branch (763:13): [Folded - Ignored]
764
0
            return; // Already dropped, nothing to do.
765
        };
766
2
        let mut subscribed_keys = subscribed_keys.write();
767
2
        let Some(value) = subscribed_keys.get(&key) else {
  Branch (767:13): [True: 2, False: 0]
  Branch (767:13): [Folded - Ignored]
768
0
            event!(
769
0
                Level::ERROR,
770
0
                "Key {key} was not found in subscribed keys when checking if it should be removed."
771
            );
772
0
            return;
773
        };
774
        // If we have no receivers, cleanup the entry from our map.
775
2
        if value.receiver_count() == 0 {
  Branch (775:12): [True: 2, False: 0]
  Branch (775:12): [Folded - Ignored]
776
2
            subscribed_keys.remove(key);
777
2
        
}0
778
2
    }
779
}
780
781
/// A publisher for a key in Redis.
782
#[derive(Debug)]
783
struct RedisSubscriptionPublisher {
784
    sender: Mutex<tokio::sync::watch::Sender<String>>,
785
}
786
787
impl RedisSubscriptionPublisher {
788
2
    fn new(
789
2
        key: String,
790
2
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
791
2
    ) -> (Self, RedisSubscription) {
792
2
        let (sender, receiver) = tokio::sync::watch::channel(key);
793
2
        let publisher = Self {
794
2
            sender: Mutex::new(sender),
795
2
        };
796
2
        let subscription = RedisSubscription {
797
2
            receiver: Some(receiver),
798
2
            weak_subscribed_keys,
799
2
        };
800
2
        (publisher, subscription)
801
2
    }
802
803
0
    fn subscribe(
804
0
        &self,
805
0
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
806
0
    ) -> RedisSubscription {
807
0
        let receiver = self.sender.lock().subscribe();
808
0
        RedisSubscription {
809
0
            receiver: Some(receiver),
810
0
            weak_subscribed_keys,
811
0
        }
812
0
    }
813
814
2
    fn receiver_count(&self) -> usize {
815
2
        self.sender.lock().receiver_count()
816
2
    }
817
818
4
    fn notify(&self) {
819
4
        // TODO(https://github.com/sile/patricia_tree/issues/40) When this is addressed
820
4
        // we can remove the `Mutex` and use the mutable iterator directly.
821
4
        self.sender.lock().send_modify(|_| {});
822
4
    }
823
}
824
825
#[derive(Debug)]
826
pub struct RedisSubscriptionManager {
827
    subscribed_keys: Arc<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
828
    tx_for_test: tokio::sync::mpsc::UnboundedSender<String>,
829
    _subscription_spawn: JoinHandleDropGuard<()>,
830
}
831
832
impl RedisSubscriptionManager {
833
1
    pub fn new(subscribe_client: SubscriberClient, pub_sub_channel: String) -> Self {
834
1
        let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new()));
835
1
        let subscribed_keys_weak = Arc::downgrade(&subscribed_keys);
836
1
        let (tx_for_test, mut rx_for_test) = tokio::sync::mpsc::unbounded_channel();
837
1
        Self {
838
1
            subscribed_keys,
839
1
            tx_for_test,
840
1
            _subscription_spawn: spawn!("redis_subscribe_spawn", async move {
841
1
                let mut rx = subscribe_client.message_rx();
842
                loop {
843
1
                    if let Err(
e0
) = subscribe_client.subscribe(&pub_sub_channel).await {
  Branch (843:28): [True: 0, False: 1]
  Branch (843:28): [Folded - Ignored]
844
0
                        event!(Level::ERROR, "Error subscribing to pattern - {e}");
845
0
                        return;
846
1
                    }
847
1
                    let mut reconnect_rx = subscribe_client.reconnect_rx();
848
1
                    let reconnect_fut = reconnect_rx.recv().fuse();
849
1
                    tokio::pin!(reconnect_fut);
850
                    loop {
851
4
                        let 
key3
= select! {
852
4
                            
value3
= rx_for_test.recv() => {
853
3
                                let Some(value) = value else {
  Branch (853:37): [True: 3, False: 0]
  Branch (853:37): [Folded - Ignored]
854
0
                                    unreachable!("Channel should never close");
855
                                };
856
3
                                value.into()
857
                            },
858
4
                            
msg0
= rx.recv() => {
859
0
                                match msg {
860
0
                                    Ok(msg) => {
861
0
                                        if let RedisValue::String(s) = msg.value {
  Branch (861:48): [True: 0, False: 0]
  Branch (861:48): [Folded - Ignored]
862
0
                                            s
863
                                        } else {
864
0
                                            event!(Level::ERROR, "Received non-string message in RedisSubscriptionManager");
865
0
                                            continue;
866
                                        }
867
                                    },
868
0
                                    Err(e) => {
869
0
                                        // Check to see if our parent has been dropped and if so kill spawn.
870
0
                                        if subscribed_keys_weak.upgrade().is_none() {
  Branch (870:44): [True: 0, False: 0]
  Branch (870:44): [Folded - Ignored]
871
0
                                            event!(Level::WARN, "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn");
872
0
                                            return;
873
0
                                        };
874
0
                                        event!(Level::ERROR, "Error receiving message in RedisSubscriptionManager reconnecting and flagging everything changed - {e}");
875
0
                                        break;
876
                                    }
877
                                }
878
                            },
879
4
                            _ = &mut reconnect_fut => {
880
0
                                event!(Level::WARN, "Redis reconnected flagging all subscriptions as changed and resuming");
881
0
                                break;
882
                            }
883
                        };
884
3
                        let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
  Branch (884:29): [True: 3, False: 0]
  Branch (884:29): [Folded - Ignored]
885
0
                            event!(
886
0
                                Level::WARN,
887
0
                                "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"
888
                            );
889
0
                            return;
890
                        };
891
3
                        let subscribed_keys_mux = subscribed_keys.read();
892
3
                        subscribed_keys_mux
893
3
                            .common_prefix_values(&*key)
894
3
                            .for_each(RedisSubscriptionPublisher::notify);
895
                    }
896
                    // Sleep for a small amount of time to ensure we don't reconnect too quickly.
897
0
                    sleep(Duration::from_secs(1)).await;
898
                    // If we reconnect or lag behind we might have had dirty keys, so we need to
899
                    // flag all of them as changed.
900
0
                    let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
  Branch (900:25): [True: 0, False: 0]
  Branch (900:25): [Folded - Ignored]
901
0
                        event!(
902
0
                            Level::WARN,
903
0
                            "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"
904
                        );
905
0
                        return;
906
                    };
907
0
                    let subscribed_keys_mux = subscribed_keys.read();
908
0
                    // Just in case also get a new receiver.
909
0
                    rx = subscribe_client.message_rx();
910
0
                    // Drop all buffered messages, then flag everything as changed.
911
0
                    rx.resubscribe();
912
0
                    for publisher in subscribed_keys_mux.values() {
913
0
                        publisher.notify();
914
0
                    }
915
                }
916
0
            }),
917
        }
918
1
    }
919
}
920
921
impl SchedulerSubscriptionManager for RedisSubscriptionManager {
922
    type Subscription = RedisSubscription;
923
924
3
    fn notify_for_test(&self, value: String) {
925
3
        self.tx_for_test.send(value).unwrap();
926
3
    }
927
928
2
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
929
2
    where
930
2
        K: SchedulerStoreKeyProvider,
931
2
    {
932
2
        let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys);
933
2
        let mut subscribed_keys = self.subscribed_keys.write();
934
2
        let key = key.get_key();
935
2
        let key_str = key.as_str();
936
2
        let mut subscription = if let Some(
publisher0
) = subscribed_keys.get(&key_str) {
  Branch (936:39): [True: 0, False: 0]
  Branch (936:39): [Folded - Ignored]
  Branch (936:39): [Folded - Ignored]
  Branch (936:39): [True: 0, False: 2]
937
0
            publisher.subscribe(weak_subscribed_keys)
938
        } else {
939
2
            let (publisher, subscription) =
940
2
                RedisSubscriptionPublisher::new(key_str.to_string(), weak_subscribed_keys);
941
2
            subscribed_keys.insert(key_str, publisher);
942
2
            subscription
943
        };
944
2
        subscription
945
2
            .receiver
946
2
            .as_mut()
947
2
            .ok_or_else(|| {
948
0
                make_err!(
949
0
                    Code::Internal,
950
0
                    "Receiver should be set in RedisSubscriptionManager::subscribe"
951
0
                )
952
0
            })?
953
2
            .mark_changed();
954
2
955
2
        Ok(subscription)
956
2
    }
957
}
958
959
impl SchedulerStore for RedisStore {
960
    type SubscriptionManager = RedisSubscriptionManager;
961
962
3
    fn subscription_manager(&self) -> Result<Arc<RedisSubscriptionManager>, Error> {
963
3
        let mut subscription_manager = self.subscription_manager.lock();
964
965
3
        if let Some(
subscription_manager2
) = &*subscription_manager {
  Branch (965:16): [True: 2, False: 1]
  Branch (965:16): [Folded - Ignored]
966
2
            Ok(subscription_manager.clone())
967
        } else {
968
1
            let Some(pub_sub_channel) = &self.pub_sub_channel else {
  Branch (968:17): [True: 1, False: 0]
  Branch (968:17): [Folded - Ignored]
969
0
                return Err(make_input_err!(
970
0
                    "RedisStore must have a pubsub channel for a Redis Scheduler if using subscriptions"
971
0
                ));
972
            };
973
1
            let sub = Arc::new(RedisSubscriptionManager::new(
974
1
                self.subscriber_client.clone(),
975
1
                pub_sub_channel.clone(),
976
1
            ));
977
1
            *subscription_manager = Some(sub.clone());
978
1
            Ok(sub)
979
        }
980
3
    }
981
982
3
    async fn update_data<T>(&self, data: T) -> Result<Option<u64>, Error>
983
3
    where
984
3
        T: SchedulerStoreDataProvider
985
3
            + SchedulerStoreKeyProvider
986
3
            + SchedulerCurrentVersionProvider
987
3
            + Send,
988
3
    {
989
3
        let key = data.get_key();
990
3
        let key = self.encode_key(&key);
991
3
        let client = self.client_pool.next();
992
3
        let maybe_index = data.get_indexes().err_tip(|| {
993
0
            format!("Err getting index in RedisStore::update_data::versioned for {key:?}")
994
0
        })?;
995
3
        if <T as SchedulerStoreKeyProvider>::Versioned::VALUE {
  Branch (995:12): [True: 0, False: 0]
  Branch (995:12): [True: 0, False: 0]
  Branch (995:12): [Folded - Ignored]
  Branch (995:12): [Folded - Ignored]
  Branch (995:12): [True: 0, False: 1]
  Branch (995:12): [True: 2, False: 0]
996
2
            let current_version = data.current_version();
997
2
            let data = data.try_into_bytes().err_tip(|| {
998
0
                format!("Could not convert value to bytes in RedisStore::update_data::versioned for {key:?}")
999
0
            })?;
1000
2
            let mut argv = Vec::with_capacity(3 + maybe_index.len() * 2);
1001
2
            argv.push(Bytes::from(format!("{current_version}")));
1002
2
            argv.push(data);
1003
8
            for (
name, value6
) in maybe_index {
1004
6
                argv.push(Bytes::from_static(name.as_bytes()));
1005
6
                argv.push(value);
1006
6
            }
1007
2
            let new_version = self
1008
2
                .update_if_version_matches_script
1009
2
                .evalsha_with_reload::<u64, _, Vec<Bytes>>(client, vec![key.as_ref()], argv)
1010
2
                .await
1011
2
                .err_tip(|| 
format!("In RedisStore::update_data::versioned for {key:?}")0
)
?0
;
1012
2
            if new_version == 0 {
  Branch (1012:16): [True: 0, False: 0]
  Branch (1012:16): [True: 0, False: 0]
  Branch (1012:16): [Folded - Ignored]
  Branch (1012:16): [Folded - Ignored]
  Branch (1012:16): [True: 0, False: 0]
  Branch (1012:16): [True: 0, False: 2]
1013
0
                return Ok(None);
1014
2
            }
1015
            // If we have a publish channel configured, send a notice that the key has been set.
1016
2
            if let Some(pub_sub_channel) = &self.pub_sub_channel {
  Branch (1016:20): [True: 0, False: 0]
  Branch (1016:20): [True: 0, False: 0]
  Branch (1016:20): [Folded - Ignored]
  Branch (1016:20): [Folded - Ignored]
  Branch (1016:20): [True: 0, False: 0]
  Branch (1016:20): [True: 2, False: 0]
1017
2
                return Ok(client.publish(pub_sub_channel, key.as_ref()).await
?0
);
1018
0
            };
1019
0
            Ok(Some(new_version))
1020
        } else {
1021
1
            let data = data.try_into_bytes().err_tip(|| {
1022
0
                format!("Could not convert value to bytes in RedisStore::update_data::noversion for {key:?}")
1023
0
            })?;
1024
1
            let mut fields = RedisMap::new();
1025
1
            fields.reserve(1 + maybe_index.len());
1026
1
            fields.insert(DATA_FIELD_NAME.into(), data.into());
1027
1
            for (
name, value0
) in maybe_index {
1028
0
                fields.insert(name.into(), value.into());
1029
0
            }
1030
1
            client
1031
1
                .hset::<(), _, _>(key.as_ref(), fields)
1032
1
                .await
1033
1
                .err_tip(|| 
format!("In RedisStore::update_data::noversion for {key:?}")0
)
?0
;
1034
            // If we have a publish channel configured, send a notice that the key has been set.
1035
1
            if let Some(pub_sub_channel) = &self.pub_sub_channel {
  Branch (1035:20): [True: 0, False: 0]
  Branch (1035:20): [True: 0, False: 0]
  Branch (1035:20): [Folded - Ignored]
  Branch (1035:20): [Folded - Ignored]
  Branch (1035:20): [True: 1, False: 0]
  Branch (1035:20): [True: 0, False: 0]
1036
1
                return Ok(client.publish(pub_sub_channel, key.as_ref()).await
?0
);
1037
0
            };
1038
0
            Ok(Some(0)) // Always use "0" version since this is not a versioned request.
1039
        }
1040
3
    }
1041
1042
1
    async fn search_by_index_prefix<K>(
1043
1
        &self,
1044
1
        index: K,
1045
1
    ) -> Result<
1046
1
        impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
1047
1
        Error,
1048
1
    >
1049
1
    where
1050
1
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,
1051
1
    {
1052
1
        let index_value = index.index_value();
1053
2
        let run_ft_aggregate = || {
1054
2
            let client = self.client_pool.next().clone();
1055
2
            let sanitized_field = try_sanitize(index_value.as_ref()).err_tip(|| {
1056
0
                format!("In RedisStore::search_by_index_prefix::try_sanitize - {index_value:?}")
1057
0
            })?;
1058
2
            Ok::<_, Error>(async move {
1059
2
                ft_aggregate(
1060
2
                    client,
1061
2
                    format!(
1062
2
                        "{}",
1063
2
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1064
2
                    ),
1065
2
                    format!("@{}:{{ {} }}", K::INDEX_NAME, sanitized_field),
1066
2
                    FtAggregateOptions {
1067
2
                        load: Some(Load::Some(vec![
1068
2
                            SearchField {
1069
2
                                identifier: DATA_FIELD_NAME.into(),
1070
2
                                property: None,
1071
2
                            },
1072
2
                            SearchField {
1073
2
                                identifier: VERSION_FIELD_NAME.into(),
1074
2
                                property: None,
1075
2
                            },
1076
2
                        ])),
1077
2
                        cursor: Some(WithCursor {
1078
2
                            count: Some(MAX_COUNT_PER_CURSOR),
1079
2
                            max_idle: Some(CURSOR_IDLE_MS),
1080
2
                        }),
1081
2
                        pipeline: vec![AggregateOperation::SortBy {
1082
2
                            properties: K::MAYBE_SORT_KEY.map_or_else(Vec::new, |v| {
1083
0
                                vec![(format!("@{v}").into(), SortOrder::Asc)]
1084
0
                            }),
1085
2
                            max: None,
1086
2
                        }],
1087
2
                        ..Default::default()
1088
2
                    },
1089
2
                )
1090
2
                .await
1091
2
            })
1092
2
        };
1093
1
        let stream = run_ft_aggregate()
?0
1094
1
            .or_else(|_| async move {
1095
1
                let mut schema = vec![SearchSchema {
1096
1
                    field_name: K::INDEX_NAME.into(),
1097
1
                    alias: None,
1098
1
                    kind: SearchSchemaKind::Tag {
1099
1
                        sortable: false,
1100
1
                        unf: false,
1101
1
                        separator: None,
1102
1
                        casesensitive: false,
1103
1
                        withsuffixtrie: false,
1104
1
                        noindex: false,
1105
1
                    },
1106
1
                }];
1107
1
                if let Some(
sort_key0
) = K::MAYBE_SORT_KEY {
  Branch (1107:24): [True: 0, False: 0]
  Branch (1107:24): [True: 0, False: 0]
  Branch (1107:24): [Folded - Ignored]
  Branch (1107:24): [Folded - Ignored]
  Branch (1107:24): [True: 0, False: 1]
1108
0
                    schema.push(SearchSchema {
1109
0
                        field_name: sort_key.into(),
1110
0
                        alias: None,
1111
0
                        kind: SearchSchemaKind::Tag {
1112
0
                            sortable: true,
1113
0
                            unf: false,
1114
0
                            separator: None,
1115
0
                            casesensitive: false,
1116
0
                            withsuffixtrie: false,
1117
0
                            noindex: false,
1118
0
                        },
1119
0
                    });
1120
1
                }
1121
1
                let create_result = self
1122
1
                    .client_pool
1123
1
                    .next()
1124
1
                    .ft_create::<(), _>(
1125
1
                        format!(
1126
1
                            "{}",
1127
1
                            get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1128
1
                        ),
1129
1
                        FtCreateOptions {
1130
1
                            on: Some(IndexKind::Hash),
1131
1
                            prefixes: vec![K::KEY_PREFIX.into()],
1132
1
                            nohl: true,
1133
1
                            nofields: true,
1134
1
                            nofreqs: true,
1135
1
                            nooffsets: true,
1136
1
                            temporary: Some(INDEX_TTL_S),
1137
1
                            ..Default::default()
1138
1
                        },
1139
1
                        schema,
1140
1
                    )
1141
1
                    .await
1142
1
                    .err_tip(|| {
1143
0
                        format!(
1144
0
                            "Error with ft_create in RedisStore::search_by_index_prefix({})",
1145
0
                            get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1146
0
                        )
1147
0
                    });
1148
1
                let run_result = run_ft_aggregate()
?0
.await.err_tip(|| {
1149
0
                    format!(
1150
0
                        "Error with second ft_aggregate in RedisStore::search_by_index_prefix({})",
1151
0
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1152
0
                    )
1153
0
                });
1154
                // Creating the index will race which is ok. If it fails to create, we only
1155
                // error if the second ft_aggregate call fails and fails to create.
1156
1
                run_result.or_else(move |e| 
create_result.merge(Err(e))0
)
1157
2
            })
1158
1
            .await
?0
;
1159
1
        Ok(stream.map(|result| 
{0
1160
0
            let mut redis_map =
1161
0
                result.err_tip(|| "Error in stream of in RedisStore::search_by_index_prefix")?;
1162
0
            let bytes_data = redis_map
1163
0
                .remove(&RedisKey::from_static_str(DATA_FIELD_NAME))
1164
0
                .err_tip(|| "Missing data field in RedisStore::search_by_index_prefix")?
1165
0
                .into_bytes()
1166
0
                .err_tip(|| {
1167
0
                    formatcp!("'{DATA_FIELD_NAME}' is not Bytes in RedisStore::search_by_index_prefix::into_bytes")
1168
0
                })?;
1169
0
            let version = if <K as SchedulerIndexProvider>::Versioned::VALUE {
  Branch (1169:30): [True: 0, False: 0]
  Branch (1169:30): [True: 0, False: 0]
  Branch (1169:30): [Folded - Ignored]
  Branch (1169:30): [Folded - Ignored]
  Branch (1169:30): [True: 0, False: 0]
1170
0
                redis_map
1171
0
                    .remove(&RedisKey::from_static_str(VERSION_FIELD_NAME))
1172
0
                    .err_tip(|| "Missing version field in RedisStore::search_by_index_prefix")?
1173
0
                    .as_u64()
1174
0
                    .err_tip(|| {
1175
0
                        formatcp!("'{VERSION_FIELD_NAME}' is not u64 in RedisStore::search_by_index_prefix::as_u64")
1176
0
                    })?
1177
            } else {
1178
0
                0
1179
            };
1180
0
            K::decode(version, bytes_data)
1181
0
                .err_tip(|| "In RedisStore::search_by_index_prefix::decode")
1182
0
        }))
1183
1
    }
1184
1185
2
    async fn get_and_decode<K>(
1186
2
        &self,
1187
2
        key: K,
1188
2
    ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>
1189
2
    where
1190
2
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send,
1191
2
    {
1192
2
        let key = key.get_key();
1193
2
        let key = self.encode_key(&key);
1194
2
        let client = self.client_pool.next();
1195
2
        let (maybe_version, maybe_data) = client
1196
2
            .hmget::<(Option<u64>, Option<Bytes>), _, _>(
1197
2
                key.as_ref(),
1198
2
                vec![
1199
2
                    RedisKey::from(VERSION_FIELD_NAME),
1200
2
                    RedisKey::from(DATA_FIELD_NAME),
1201
2
                ],
1202
2
            )
1203
2
            .await
1204
2
            .err_tip(|| 
format!("In RedisStore::get_without_version::notversioned {key}")0
)
?0
;
1205
2
        let Some(data) = maybe_data else {
  Branch (1205:13): [True: 0, False: 0]
  Branch (1205:13): [True: 0, False: 0]
  Branch (1205:13): [Folded - Ignored]
  Branch (1205:13): [Folded - Ignored]
  Branch (1205:13): [True: 2, False: 0]
1206
0
            return Ok(None);
1207
        };
1208
2
        Ok(Some(K::decode(maybe_version.unwrap_or(0), data).err_tip(
1209
0
            || format!("In RedisStore::get_with_version::notversioned::decode {key}"),
1210
0
        )?))
1211
2
    }
1212
}