Coverage Report

Created: 2025-10-30 00:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/redis_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::ops::{Bound, RangeBounds};
17
use core::pin::Pin;
18
use core::time::Duration;
19
use std::borrow::Cow;
20
use std::sync::{Arc, Weak};
21
22
use async_trait::async_trait;
23
use bytes::Bytes;
24
use const_format::formatcp;
25
use fred::clients::{Pool as RedisPool, SubscriberClient};
26
use fred::interfaces::{ClientLike, KeysInterface, PubsubInterface};
27
use fred::prelude::{Client, EventInterface, HashesInterface, RediSearchInterface};
28
use fred::types::config::{
29
    Config as RedisConfig, ConnectionConfig, PerformanceConfig, ReconnectPolicy, UnresponsiveConfig,
30
};
31
use fred::types::redisearch::{
32
    AggregateOperation, FtAggregateOptions, FtCreateOptions, IndexKind, Load, SearchField,
33
    SearchSchema, SearchSchemaKind, WithCursor,
34
};
35
use fred::types::scan::Scanner;
36
use fred::types::scripts::Script;
37
use fred::types::{Builder, Key as RedisKey, Map as RedisMap, SortOrder, Value as RedisValue};
38
use futures::stream::FuturesUnordered;
39
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future};
40
use nativelink_config::stores::{RedisMode, RedisSpec};
41
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
42
use nativelink_metric::MetricsComponent;
43
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
44
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
45
use nativelink_util::spawn;
46
use nativelink_util::store_trait::{
47
    BoolValue, RemoveItemCallback, SchedulerCurrentVersionProvider, SchedulerIndexProvider,
48
    SchedulerStore, SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,
49
    SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo,
50
};
51
use nativelink_util::task::JoinHandleDropGuard;
52
use parking_lot::{Mutex, RwLock};
53
use patricia_tree::StringPatriciaMap;
54
use tokio::select;
55
use tokio::time::sleep;
56
use tracing::{error, info, warn};
57
use uuid::Uuid;
58
59
use crate::cas_utils::is_zero_digest;
60
use crate::redis_utils::ft_aggregate;
61
62
/// The default size of the read chunk when reading data from Redis.
63
/// Note: If this changes it should be updated in the config documentation.
64
const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024;
65
66
/// The default size of the connection pool if not specified.
67
/// Note: If this changes it should be updated in the config documentation.
68
const DEFAULT_CONNECTION_POOL_SIZE: usize = 3;
69
70
/// The default delay between retries if not specified.
71
/// Note: If this changes it should be updated in the config documentation.
72
const DEFAULT_RETRY_DELAY: f32 = 0.1;
73
/// The amount of jitter to add to the retry delay if not specified.
74
/// Note: If this changes it should be updated in the config documentation.
75
const DEFAULT_RETRY_JITTER: f32 = 0.5;
76
77
/// The default maximum capacity of the broadcast channel if not specified.
78
/// Note: If this changes it should be updated in the config documentation.
79
const DEFAULT_BROADCAST_CHANNEL_CAPACITY: usize = 4096;
80
81
/// The default connection timeout in milliseconds if not specified.
82
/// Note: If this changes it should be updated in the config documentation.
83
const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000;
84
85
/// The default command timeout in milliseconds if not specified.
86
/// Note: If this changes it should be updated in the config documentation.
87
const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000;
88
89
/// The default maximum number of chunk uploads per update.
90
/// Note: If this changes it should be updated in the config documentation.
91
const DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE: usize = 10;
92
93
/// The default COUNT value passed when scanning keys in Redis.
94
/// Note: If this changes it should be updated in the config documentation.
95
const DEFAULT_SCAN_COUNT: u32 = 10_000;
96
97
/// A [`StoreDriver`] implementation that uses Redis as a backing store.
98
#[derive(Debug, MetricsComponent)]
99
pub struct RedisStore {
100
    /// The client pool connecting to the backing Redis instance(s).
101
    client_pool: RedisPool,
102
103
    /// A channel to publish updates to when a key is added, removed, or modified.
104
    #[metric(
105
        help = "The pubsub channel to publish updates to when a key is added, removed, or modified"
106
    )]
107
    pub_sub_channel: Option<String>,
108
109
    /// A redis client for managing subscriptions.
110
    /// TODO: This should be moved into the store in followups once a standard use pattern has been determined.
111
    subscriber_client: SubscriberClient,
112
113
    /// A function used to generate names for temporary keys.
114
    temp_name_generator_fn: fn() -> String,
115
116
    /// A common prefix to append to all keys before they are sent to Redis.
117
    ///
118
    /// See [`RedisStore::key_prefix`](`nativelink_config::stores::RedisStore::key_prefix`).
119
    #[metric(help = "Prefix to append to all keys before sending to Redis")]
120
    key_prefix: String,
121
122
    /// The amount of data to read from Redis at a time.
123
    #[metric(help = "The amount of data to read from Redis at a time")]
124
    read_chunk_size: usize,
125
126
    /// The maximum number of chunk uploads per update.
127
    /// This is used to limit the number of chunk uploads per update to prevent
128
    #[metric(help = "The maximum number of chunk uploads per update")]
129
    max_chunk_uploads_per_update: usize,
130
131
    /// The COUNT value passed when scanning keys in Redis.
132
    /// This is used to hint the amount of work that should be done per response.
133
    #[metric(help = "The COUNT value passed when scanning keys in Redis")]
134
    scan_count: u32,
135
136
    /// Redis script used to update a value in redis if the version matches.
137
    /// This is done by incrementing the version number and then setting the new data
138
    /// only if the version number matches the existing version number.
139
    update_if_version_matches_script: Script,
140
141
    /// A manager for subscriptions to keys in Redis.
142
    subscription_manager: Mutex<Option<Arc<RedisSubscriptionManager>>>,
143
}
144
145
impl RedisStore {
146
    /// Create a new `RedisStore` from the given configuration.
147
2
    pub fn new(mut spec: RedisSpec) -> Result<Arc<Self>, Error> {
148
2
        if spec.addresses.is_empty() {
  Branch (148:12): [True: 0, False: 2]
  Branch (148:12): [Folded - Ignored]
149
0
            return Err(make_err!(
150
0
                Code::InvalidArgument,
151
0
                "No addresses were specified in redis store configuration."
152
0
            ));
153
2
        }
154
2
        let [addr] = spec.addresses.as_slice() else {
  Branch (154:13): [True: 2, False: 0]
  Branch (154:13): [Folded - Ignored]
155
0
            return Err(make_err!(
156
0
                Code::Unimplemented,
157
0
                "Connecting directly to multiple redis nodes in a cluster is currently unsupported. Please specify a single URL to a single node, and nativelink will use cluster discover to find the other nodes."
158
0
            ));
159
        };
160
2
        let redis_config = match spec.mode {
161
0
            RedisMode::Cluster => RedisConfig::from_url_clustered(addr),
162
0
            RedisMode::Sentinel => RedisConfig::from_url_sentinel(addr),
163
2
            RedisMode::Standard => RedisConfig::from_url_centralized(addr),
164
        }
165
2
        .err_tip_with_code(|e| 
{0
166
0
            (
167
0
                Code::InvalidArgument,
168
0
                format!("while parsing redis node address: {e}"),
169
0
            )
170
0
        })?;
171
172
2
        let reconnect_policy = {
173
2
            if spec.retry.delay == 0.0 {
  Branch (173:16): [True: 2, False: 0]
  Branch (173:16): [Folded - Ignored]
174
2
                spec.retry.delay = DEFAULT_RETRY_DELAY;
175
2
            
}0
176
2
            if spec.retry.jitter == 0.0 {
  Branch (176:16): [True: 2, False: 0]
  Branch (176:16): [Folded - Ignored]
177
2
                spec.retry.jitter = DEFAULT_RETRY_JITTER;
178
2
            
}0
179
180
4
            let 
to_ms2
= |secs: f32| -> u32 {
181
4
                Duration::from_secs_f32(secs)
182
4
                    .as_millis()
183
4
                    .try_into()
184
4
                    .unwrap_or(u32::MAX)
185
4
            };
186
187
2
            let max_retries = u32::try_from(spec.retry.max_retries)
188
2
                .err_tip(|| "max_retries could not be converted to u32 in RedisStore::new")
?0
;
189
190
2
            let min_delay_ms = to_ms(spec.retry.delay);
191
2
            let max_delay_ms = 8000;
192
2
            let jitter = to_ms(spec.retry.jitter * spec.retry.delay);
193
194
2
            let mut reconnect_policy =
195
2
                ReconnectPolicy::new_exponential(max_retries, min_delay_ms, max_delay_ms, 2);
196
2
            reconnect_policy.set_jitter(jitter);
197
2
            reconnect_policy
198
        };
199
200
        {
201
2
            if spec.broadcast_channel_capacity == 0 {
  Branch (201:16): [True: 2, False: 0]
  Branch (201:16): [Folded - Ignored]
202
2
                spec.broadcast_channel_capacity = DEFAULT_BROADCAST_CHANNEL_CAPACITY;
203
2
            
}0
204
2
            if spec.connection_timeout_ms == 0 {
  Branch (204:16): [True: 2, False: 0]
  Branch (204:16): [Folded - Ignored]
205
2
                spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS;
206
2
            
}0
207
2
            if spec.command_timeout_ms == 0 {
  Branch (207:16): [True: 2, False: 0]
  Branch (207:16): [Folded - Ignored]
208
2
                spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS;
209
2
            
}0
210
2
            if spec.connection_pool_size == 0 {
  Branch (210:16): [True: 2, False: 0]
  Branch (210:16): [Folded - Ignored]
211
2
                spec.connection_pool_size = DEFAULT_CONNECTION_POOL_SIZE;
212
2
            
}0
213
2
            if spec.read_chunk_size == 0 {
  Branch (213:16): [True: 2, False: 0]
  Branch (213:16): [Folded - Ignored]
214
2
                spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
215
2
            
}0
216
2
            if spec.max_chunk_uploads_per_update == 0 {
  Branch (216:16): [True: 2, False: 0]
  Branch (216:16): [Folded - Ignored]
217
2
                spec.max_chunk_uploads_per_update = DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE;
218
2
            
}0
219
2
            if spec.scan_count == 0 {
  Branch (219:16): [True: 2, False: 0]
  Branch (219:16): [Folded - Ignored]
220
2
                spec.scan_count = DEFAULT_SCAN_COUNT;
221
2
            
}0
222
        }
223
2
        let connection_timeout = Duration::from_millis(spec.connection_timeout_ms);
224
2
        let command_timeout = Duration::from_millis(spec.command_timeout_ms);
225
226
2
        let mut builder = Builder::from_config(redis_config);
227
2
        builder
228
2
            .set_performance_config(PerformanceConfig {
229
2
                default_command_timeout: command_timeout,
230
2
                broadcast_channel_capacity: spec.broadcast_channel_capacity,
231
2
                ..Default::default()
232
2
            })
233
2
            .set_connection_config(ConnectionConfig {
234
2
                connection_timeout,
235
2
                internal_command_timeout: command_timeout,
236
2
                unresponsive: UnresponsiveConfig {
237
2
                    max_timeout: Some(connection_timeout),
238
2
                    // This number needs to be less than the connection timeout.
239
2
                    // We use 4 as it is a good balance between not spamming the server
240
2
                    // and not waiting too long.
241
2
                    interval: connection_timeout / 4,
242
2
                },
243
2
                ..Default::default()
244
2
            })
245
2
            .set_policy(reconnect_policy);
246
247
2
        let client_pool = builder
248
2
            .build_pool(spec.connection_pool_size)
249
2
            .err_tip(|| "while creating redis connection pool")
?0
;
250
251
2
        let subscriber_client = builder
252
2
            .build_subscriber_client()
253
2
            .err_tip(|| "while creating redis subscriber client")
?0
;
254
255
2
        Self::new_from_builder_and_parts(
256
2
            client_pool,
257
2
            subscriber_client,
258
2
            spec.experimental_pub_sub_channel.clone(),
259
1
            || Uuid::new_v4().to_string(),
260
2
            spec.key_prefix.clone(),
261
2
            spec.read_chunk_size,
262
2
            spec.max_chunk_uploads_per_update,
263
2
            spec.scan_count,
264
        )
265
2
        .map(Arc::new)
266
2
    }
267
268
    /// Used for testing when determinism is required.
269
    #[expect(clippy::too_many_arguments)]
270
15
    pub fn new_from_builder_and_parts(
271
15
        client_pool: RedisPool,
272
15
        subscriber_client: SubscriberClient,
273
15
        pub_sub_channel: Option<String>,
274
15
        temp_name_generator_fn: fn() -> String,
275
15
        key_prefix: String,
276
15
        read_chunk_size: usize,
277
15
        max_chunk_uploads_per_update: usize,
278
15
        scan_count: u32,
279
15
    ) -> Result<Self, Error> {
280
        // Start connection pool (this will retry forever by default).
281
15
        client_pool.connect();
282
15
        subscriber_client.connect();
283
284
15
        info!("Redis index fingerprint: {FINGERPRINT_CREATE_INDEX_HEX}");
285
286
15
        Ok(Self {
287
15
            client_pool,
288
15
            pub_sub_channel,
289
15
            subscriber_client,
290
15
            temp_name_generator_fn,
291
15
            key_prefix,
292
15
            read_chunk_size,
293
15
            max_chunk_uploads_per_update,
294
15
            scan_count,
295
15
            update_if_version_matches_script: Script::from_lua(LUA_VERSION_SET_SCRIPT),
296
15
            subscription_manager: Mutex::new(None),
297
15
        })
298
15
    }
299
300
83
    async fn get_client(&'_ self) -> Result<&'_ Client, Error> {
301
83
        let client = self.client_pool.next();
302
83
        let config = client.client_config();
303
83
        if config.mocks.is_none() {
  Branch (303:12): [True: 0, False: 0]
  Branch (303:12): [True: 2, False: 24]
  Branch (303:12): [Folded - Ignored]
  Branch (303:12): [True: 0, False: 57]
304
2
            client.wait_for_connect().await.err_tip(||
305
                format!(
306
2
                    "Connection issue connecting to redis server with hosts: {:?}, username: {}, database: {}",
307
2
                    config.server.hosts().iter().map(|s| format!("{}:{}", s.host, s.port)).collect::<Vec<String>>(),
308
2
                    config.username.unwrap_or_else(|| "None".to_string()),
309
2
                    config.database.unwrap_or_default()
310
                )
311
2
            )?;
312
81
        }
313
81
        Ok(client)
314
83
    }
315
316
    /// Encode a [`StoreKey`] so it can be sent to Redis.
317
74
    fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> {
318
74
        let key_body = key.as_str();
319
74
        if self.key_prefix.is_empty() {
  Branch (319:12): [True: 70, False: 4]
  Branch (319:12): [Folded - Ignored]
320
70
            key_body
321
        } else {
322
            // This is in the hot path for all redis operations, so we try to reuse the allocation
323
            // from `key.as_str()` if possible.
324
4
            match key_body {
325
4
                Cow::Owned(mut encoded_key) => {
326
4
                    encoded_key.insert_str(0, &self.key_prefix);
327
4
                    Cow::Owned(encoded_key)
328
                }
329
0
                Cow::Borrowed(body) => {
330
0
                    let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len());
331
0
                    encoded_key.push_str(&self.key_prefix);
332
0
                    encoded_key.push_str(body);
333
0
                    Cow::Owned(encoded_key)
334
                }
335
            }
336
        }
337
74
    }
338
}
339
340
#[async_trait]
341
impl StoreDriver for RedisStore {
342
    async fn has_with_results(
343
        self: Pin<&Self>,
344
        keys: &[StoreKey<'_>],
345
        results: &mut [Option<u64>],
346
7
    ) -> Result<(), Error> {
347
        // TODO(palfrey) We could use pipeline here, but it makes retry more
348
        // difficult and it doesn't work very well in cluster mode.
349
        // If we wanted to optimize this with pipeline be careful to
350
        // implement retry and to support cluster mode.
351
        let client = self.get_client().await?;
352
        keys.iter()
353
            .zip(results.iter_mut())
354
6
            .map(|(key, result)| async move {
355
                // We need to do a special pass to ensure our zero key exist.
356
6
                if is_zero_digest(key.borrow()) {
  Branch (356:20): [True: 2, False: 4]
  Branch (356:20): [Folded - Ignored]
357
2
                    *result = Some(0);
358
2
                    return Ok::<_, Error>(());
359
4
                }
360
4
                let encoded_key = self.encode_key(key);
361
4
                let pipeline = client.pipeline();
362
4
                pipeline
363
4
                    .strlen::<(), _>(encoded_key.as_ref())
364
4
                    .await
365
4
                    .err_tip(|| 
{0
366
0
                        format!("In RedisStore::has_with_results::strlen for {encoded_key}")
367
0
                    })?;
368
                // Redis returns 0 when the key doesn't exist
369
                // AND when the key exists with value of length 0.
370
                // Therefore, we need to check both length and existence
371
                // and do it in a pipeline for efficiency.
372
4
                pipeline
373
4
                    .exists::<(), _>(encoded_key.as_ref())
374
4
                    .await
375
4
                    .err_tip(|| 
{0
376
0
                        format!("In RedisStore::has_with_results::exists for {encoded_key}")
377
0
                    })?;
378
4
                let (blob_len, exists) = pipeline
379
4
                    .all::<(u64, bool)>()
380
4
                    .await
381
4
                    .err_tip(|| "In RedisStore::has_with_results::query")
?0
;
382
383
4
                *result = if exists { Some(blob_len) } else { 
None0
};
  Branch (383:30): [True: 4, False: 0]
  Branch (383:30): [Folded - Ignored]
384
385
4
                Ok::<_, Error>(())
386
12
            })
387
            .collect::<FuturesUnordered<_>>()
388
            .try_collect()
389
            .await
390
7
    }
391
392
    async fn list(
393
        self: Pin<&Self>,
394
        range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
395
        handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
396
8
    ) -> Result<u64, Error> {
397
        let range = (
398
            range.0.map(StoreKey::into_owned),
399
            range.1.map(StoreKey::into_owned),
400
        );
401
        let pattern = match range.0 {
402
            Bound::Included(ref start) | Bound::Excluded(ref start) => match range.1 {
403
                Bound::Included(ref end) | Bound::Excluded(ref end) => {
404
                    let start = start.as_str();
405
                    let end = end.as_str();
406
                    let max_length = start.len().min(end.len());
407
                    let length = start
408
                        .chars()
409
                        .zip(end.chars())
410
20
                        .position(|(a, b)| a != b)
411
                        .unwrap_or(max_length);
412
                    format!("{}{}*", self.key_prefix, &start[..length])
413
                }
414
                Bound::Unbounded => format!("{}*", self.key_prefix),
415
            },
416
            Bound::Unbounded => format!("{}*", self.key_prefix),
417
        };
418
        let client = self.get_client().await?;
419
        let mut scan_stream = client.scan(pattern, Some(self.scan_count), None);
420
        let mut iterations = 0;
421
        'outer: while let Some(mut page) = scan_stream.try_next().await? {
422
            if let Some(keys) = page.take_results() {
423
                for key in keys {
424
                    // TODO: Notification of conversion errors
425
                    // Any results that do not conform to expectations are ignored.
426
                    if let Some(key) = key.as_str() {
427
                        if let Some(key) = key.strip_prefix(&self.key_prefix) {
428
                            let key = StoreKey::new_str(key);
429
                            if range.contains(&key) {
430
                                iterations += 1;
431
                                if !handler(&key) {
432
                                    break 'outer;
433
                                }
434
                            }
435
                        }
436
                    }
437
                }
438
            }
439
            page.next();
440
        }
441
        Ok(iterations)
442
8
    }
443
444
    async fn update(
445
        self: Pin<&Self>,
446
        key: StoreKey<'_>,
447
        mut reader: DropCloserReadHalf,
448
        _upload_size: UploadSizeInfo,
449
8
    ) -> Result<(), Error> {
450
        let final_key = self.encode_key(&key);
451
452
        // While the name generation function can be supplied by the user, we need to have the curly
453
        // braces in place in order to manage redis' hashing behavior and make sure that the temporary
454
        // key name and the final key name are directed to the same cluster node. See
455
        // https://redis.io/blog/redis-clustering-best-practices-with-keys/
456
        //
457
        // The TL;DR is that if we're in cluster mode and the names hash differently, we can't use request
458
        // pipelining. By using these braces, we tell redis to only hash the part of the temporary key that's
459
        // identical to the final key -- so they will always hash to the same node.
460
        let temp_key = format!(
461
            "temp-{}-{{{}}}",
462
            (self.temp_name_generator_fn)(),
463
            &final_key
464
        );
465
466
        if is_zero_digest(key.borrow()) {
467
            let chunk = reader
468
                .peek()
469
                .await
470
                .err_tip(|| "Failed to peek in RedisStore::update")?;
471
            if chunk.is_empty() {
472
                reader
473
                    .drain()
474
                    .await
475
                    .err_tip(|| "Failed to drain in RedisStore::update")?;
476
                // Zero-digest keys are special -- we don't need to do anything with it.
477
                return Ok(());
478
            }
479
        }
480
481
        let client = self.get_client().await?;
482
483
        let mut read_stream = reader
484
6
            .scan(0u32, |bytes_read, chunk_res| {
485
6
                future::ready(Some(
486
6
                    chunk_res
487
6
                        .err_tip(|| "Failed to read chunk in update in redis store")
488
6
                        .and_then(|chunk| 
{5
489
5
                            let offset = *bytes_read;
490
5
                            let chunk_len = u32::try_from(chunk.len()).err_tip(
491
                                || "Could not convert chunk length to u32 in RedisStore::update",
492
0
                            )?;
493
5
                            let new_bytes_read = bytes_read
494
5
                                .checked_add(chunk_len)
495
5
                                .err_tip(|| "Overflow protection in RedisStore::update")
?0
;
496
5
                            *bytes_read = new_bytes_read;
497
5
                            Ok::<_, Error>((offset, *bytes_read, chunk))
498
5
                        }),
499
                ))
500
6
            })
501
6
            .map(|res| {
502
6
                let (
offset5
,
end_pos5
,
chunk5
) = res
?1
;
503
5
                let temp_key_ref = &temp_key;
504
5
                Ok(async move {
505
5
                    client
506
5
                        .setrange::<(), _, _>(temp_key_ref, offset, chunk)
507
5
                        .await
508
5
                        .err_tip(
509
0
                            || format!("While appending to temp key ({temp_key_ref}) in RedisStore::update. offset = {offset}. end_pos = {end_pos}"),
510
0
                        )?;
511
5
                    Ok::<u32, Error>(end_pos)
512
5
                })
513
6
            })
514
            .try_buffer_unordered(self.max_chunk_uploads_per_update);
515
516
        let mut total_len: u32 = 0;
517
        while let Some(last_pos) = read_stream.try_next().await? {
518
            if last_pos > total_len {
519
                total_len = last_pos;
520
            }
521
        }
522
523
        let blob_len = client
524
            .strlen::<u64, _>(&temp_key)
525
            .await
526
0
            .err_tip(|| format!("In RedisStore::update strlen check for {temp_key}"))?;
527
        // This is a safety check to ensure that in the event some kind of retry was to happen
528
        // and the data was appended to the key twice, we reject the data.
529
        if blob_len != u64::from(total_len) {
530
            return Err(make_input_err!(
531
                "Data length mismatch in RedisStore::update for {}({}) - expected {} bytes, got {} bytes",
532
                key.borrow().as_str(),
533
                temp_key,
534
                total_len,
535
                blob_len,
536
            ));
537
        }
538
539
        // Rename the temp key so that the data appears under the real key. Any data already present in the real key is lost.
540
        client
541
            .rename::<(), _, _>(&temp_key, final_key.as_ref())
542
            .await
543
            .err_tip(|| "While queueing key rename in RedisStore::update()")?;
544
545
        // If we have a publish channel configured, send a notice that the key has been set.
546
        if let Some(pub_sub_channel) = &self.pub_sub_channel {
547
            return Ok(client.publish(pub_sub_channel, final_key.as_ref()).await?);
548
        }
549
550
        Ok(())
551
8
    }
552
553
    async fn get_part(
554
        self: Pin<&Self>,
555
        key: StoreKey<'_>,
556
        writer: &mut DropCloserWriteHalf,
557
        offset: u64,
558
        length: Option<u64>,
559
5
    ) -> Result<(), Error> {
560
        let offset = usize::try_from(offset).err_tip(|| "Could not convert offset to usize")?;
561
        let length = length
562
4
            .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize"))
563
            .transpose()?;
564
565
        // To follow RBE spec we need to consider any digest's with
566
        // zero size to be existing.
567
        if is_zero_digest(key.borrow()) {
568
            return writer
569
                .send_eof()
570
                .err_tip(|| "Failed to send zero EOF in redis store get_part");
571
        }
572
573
        let client = self.get_client().await?;
574
        let encoded_key = self.encode_key(&key);
575
        let encoded_key = encoded_key.as_ref();
576
577
        // N.B. the `-1`'s you see here are because redis GETRANGE is inclusive at both the start and end, so when we
578
        // do math with indices we change them to be exclusive at the end.
579
580
        // We want to read the data at the key from `offset` to `offset + length`.
581
        let data_start = offset;
582
        let data_end = data_start
583
            .saturating_add(length.unwrap_or(isize::MAX as usize))
584
            .saturating_sub(1);
585
586
        // And we don't ever want to read more than `read_chunk_size` bytes at a time, so we'll need to iterate.
587
        let mut chunk_start = data_start;
588
        let mut chunk_end = cmp::min(
589
            data_start.saturating_add(self.read_chunk_size) - 1,
590
            data_end,
591
        );
592
593
        loop {
594
            let chunk: Bytes = client
595
                .getrange(encoded_key, chunk_start, chunk_end)
596
                .await
597
                .err_tip(|| "In RedisStore::get_part::getrange")?;
598
599
            let didnt_receive_full_chunk = chunk.len() < self.read_chunk_size;
600
            let reached_end_of_data = chunk_end == data_end;
601
602
            if didnt_receive_full_chunk || reached_end_of_data {
603
                if !chunk.is_empty() {
604
                    writer
605
                        .send(chunk)
606
                        .await
607
                        .err_tip(|| "Failed to write data in RedisStore::get_part")?;
608
                }
609
610
                break; // No more data to read.
611
            }
612
613
            // We received a full chunk's worth of data, so write it...
614
            writer
615
                .send(chunk)
616
                .await
617
                .err_tip(|| "Failed to write data in RedisStore::get_part")?;
618
619
            // ...and go grab the next chunk.
620
            chunk_start = chunk_end + 1;
621
            chunk_end = cmp::min(
622
                chunk_start.saturating_add(self.read_chunk_size) - 1,
623
                data_end,
624
            );
625
        }
626
627
        // If we didn't write any data, check if the key exists, if not return a NotFound error.
628
        // This is required by spec.
629
        if writer.get_bytes_written() == 0 {
630
            // We're supposed to read 0 bytes, so just check if the key exists.
631
            let exists = client
632
                .exists::<bool, _>(encoded_key)
633
                .await
634
                .err_tip(|| "In RedisStore::get_part::zero_exists")?;
635
636
            if !exists {
637
                return Err(make_err!(
638
                    Code::NotFound,
639
                    "Data not found in Redis store for digest: {key:?}"
640
                ));
641
            }
642
        }
643
644
        writer
645
            .send_eof()
646
            .err_tip(|| "Failed to write EOF in redis store get_part")
647
5
    }
648
649
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
650
0
        self
651
0
    }
652
653
0
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send) {
654
0
        self
655
0
    }
656
657
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send> {
658
0
        self
659
0
    }
660
661
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
662
0
        registry.register_indicator(self);
663
0
    }
664
665
0
    fn register_remove_callback(
666
0
        self: Arc<Self>,
667
0
        _callback: Arc<dyn RemoveItemCallback>,
668
0
    ) -> Result<(), Error> {
669
        // As redis doesn't drop stuff, we can just ignore this
670
0
        Ok(())
671
0
    }
672
}
673
674
#[async_trait]
675
impl HealthStatusIndicator for RedisStore {
676
1
    fn get_name(&self) -> &'static str {
677
1
        "RedisStore"
678
1
    }
679
680
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
681
        StoreDriver::check_health(Pin::new(self), namespace).await
682
0
    }
683
}
684
685
// -------------------------------------------------------------------
686
// Below this line are specific to the redis scheduler implementation.
687
// -------------------------------------------------------------------
688
689
/// The maximum number of results to return per cursor.
690
const MAX_COUNT_PER_CURSOR: u64 = 256;
691
/// The time in milliseconds that a redis cursor can be idle before it is closed.
692
const CURSOR_IDLE_MS: u64 = 2_000;
693
/// The name of the field in the Redis hash that stores the data.
694
const DATA_FIELD_NAME: &str = "data";
695
/// The name of the field in the Redis hash that stores the version.
696
const VERSION_FIELD_NAME: &str = "version";
697
/// The time to live of indexes in seconds. After this time redis may delete the index.
698
const INDEX_TTL_S: u64 = 60 * 60 * 24; // 24 hours.
699
700
/// Lua script to set a key if the version matches.
701
/// Args:
702
///   KEYS[1]: The key where the version is stored.
703
///   ARGV[1]: The expected version.
704
///   ARGV[2]: The new data.
705
///   ARGV[3*]: Key-value pairs of additional data to include.
706
/// Returns:
707
///   The new version if the version matches. nil is returned if the
708
///   value was not set.
709
const LUA_VERSION_SET_SCRIPT: &str = formatcp!(
710
    r"
711
local key = KEYS[1]
712
local expected_version = tonumber(ARGV[1])
713
local new_data = ARGV[2]
714
local new_version = redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', 1)
715
local i
716
local indexes = {{}}
717
718
if new_version-1 ~= expected_version then
719
    redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', -1)
720
    return {{ 0, new_version-1 }}
721
end
722
-- Skip first 2 argvs, as they are known inputs.
723
-- Remember: Lua is 1-indexed.
724
for i=3, #ARGV do
725
    indexes[i-2] = ARGV[i]
726
end
727
728
-- In testing we witnessed redis sometimes not update our FT indexes
729
-- resulting in stale data. It appears if we delete our keys then insert
730
-- them again it works and reduces risk significantly.
731
redis.call('DEL', key)
732
redis.call('HSET', key, '{DATA_FIELD_NAME}', new_data, '{VERSION_FIELD_NAME}', new_version, unpack(indexes))
733
734
return {{ 1, new_version }}
735
"
736
);
737
738
/// This is the output of the calculations below hardcoded into the executable.
739
const FINGERPRINT_CREATE_INDEX_HEX: &str = "3e762c15";
740
741
#[cfg(test)]
742
mod test {
743
    use super::FINGERPRINT_CREATE_INDEX_HEX;
744
745
    /// String of the `FT.CREATE` command used to create the index template.
746
    const CREATE_INDEX_TEMPLATE: &str = "FT.CREATE {} ON HASH PREFIX 1 {} NOOFFSETS NOHL NOFIELDS NOFREQS SCHEMA {} TAG CASESENSITIVE SORTABLE";
747
748
    /// Compile-time fingerprint of the `FT.CREATE` command used to create the
749
    /// index template. This is a simple CRC32 checksum of the command string.
750
    /// We don't care about it actually being a valid CRC32 checksum, just that
751
    /// it's a unique identifier with a low chance of collision.
752
1
    const fn fingerprint_create_index_template() -> u32 {
753
        const POLY: u32 = 0xEDB8_8320;
754
        const DATA: &[u8] = CREATE_INDEX_TEMPLATE.as_bytes();
755
1
        let mut crc = 0xFFFF_FFFF;
756
1
        let mut i = 0;
757
102
        while i < DATA.len() {
  Branch (757:15): [True: 101, False: 1]
758
101
            let byte = DATA[i];
759
101
            crc ^= byte as u32;
760
761
101
            let mut j = 0;
762
909
            while j < 8 {
  Branch (762:19): [True: 808, False: 101]
763
808
                crc = if crc & 1 != 0 {
  Branch (763:26): [True: 386, False: 422]
764
386
                    (crc >> 1) ^ POLY
765
                } else {
766
422
                    crc >> 1
767
                };
768
808
                j += 1;
769
            }
770
101
            i += 1;
771
        }
772
1
        crc
773
1
    }
774
775
    /// Verify that our calculation always evaluates to this fixed value.
776
    #[test]
777
1
    fn test_fingerprint_value() {
778
1
        assert_eq!(
779
1
            format!("{:08x}", &fingerprint_create_index_template()),
780
            FINGERPRINT_CREATE_INDEX_HEX,
781
        );
782
1
    }
783
}
784
785
/// Get the name of the index to create for the given field.
786
/// This will add some prefix data to the name to try and ensure
787
/// if the index definition changes, the name will get a new name.
788
macro_rules! get_index_name {
789
    ($prefix:expr, $field:expr, $maybe_sort:expr) => {
790
        format_args!(
791
            "{}_{}_{}_{}",
792
            $prefix,
793
            $field,
794
            $maybe_sort.unwrap_or(""),
795
            FINGERPRINT_CREATE_INDEX_HEX
796
        )
797
    };
798
}
799
800
/// Try to sanitize a string to be used as a Redis key.
801
/// We don't actually modify the string, just check if it's valid.
802
22
const fn try_sanitize(s: &str) -> Option<&str> {
803
    // Note: We cannot use for loops or iterators here because they are not const.
804
    // Allowing us to use a const function here gives the compiler the ability to
805
    // optimize this function away entirely in the case where the input is constant.
806
22
    let chars = s.as_bytes();
807
22
    let mut i: usize = 0;
808
22
    let len = s.len();
809
    loop {
810
569
        if i >= len {
  Branch (810:12): [True: 22, False: 547]
  Branch (810:12): [Folded - Ignored]
811
22
            break;
812
547
        }
813
547
        let c = chars[i];
814
547
        if !c.is_ascii_alphanumeric() && 
c != b'_'25
{
  Branch (814:12): [True: 25, False: 522]
  Branch (814:42): [True: 0, False: 25]
  Branch (814:12): [Folded - Ignored]
  Branch (814:42): [Folded - Ignored]
815
0
            return None;
816
547
        }
817
547
        i += 1;
818
    }
819
22
    Some(s)
820
22
}
821
822
/// An individual subscription to a key in Redis.
823
#[derive(Debug)]
824
pub struct RedisSubscription {
825
    receiver: Option<tokio::sync::watch::Receiver<String>>,
826
    weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
827
}
828
829
impl SchedulerSubscription for RedisSubscription {
830
    /// Wait for the subscription key to change.
831
25
    async fn changed(&mut self) -> Result<(), Error> {
832
25
        let receiver = self
833
25
            .receiver
834
25
            .as_mut()
835
25
            .ok_or_else(|| make_err!(
Code::Internal0
, "In RedisSubscription::changed::as_mut"))
?0
;
836
25
        receiver
837
25
            .changed()
838
25
            .await
839
21
            .map_err(|_| make_err!(
Code::Internal0
, "In RedisSubscription::changed::changed"))
840
21
    }
841
}
842
843
// If the subscription is dropped, we need to possibly remove the key from the
844
// subscribed keys map.
845
impl Drop for RedisSubscription {
846
6
    fn drop(&mut self) {
847
6
        let Some(receiver) = self.receiver.take() else {
  Branch (847:13): [True: 6, False: 0]
  Branch (847:13): [Folded - Ignored]
848
0
            warn!("RedisSubscription has already been dropped, nothing to do.");
849
0
            return; // Already dropped, nothing to do.
850
        };
851
6
        let key = receiver.borrow().clone();
852
        // IMPORTANT: This must be dropped before receiver_count() is called.
853
6
        drop(receiver);
854
6
        let Some(
subscribed_keys4
) = self.weak_subscribed_keys.upgrade() else {
  Branch (854:13): [True: 4, False: 2]
  Branch (854:13): [Folded - Ignored]
855
2
            return; // Already dropped, nothing to do.
856
        };
857
4
        let mut subscribed_keys = subscribed_keys.write();
858
4
        let Some(value) = subscribed_keys.get(&key) else {
  Branch (858:13): [True: 4, False: 0]
  Branch (858:13): [Folded - Ignored]
859
0
            error!(
860
0
                "Key {key} was not found in subscribed keys when checking if it should be removed."
861
            );
862
0
            return;
863
        };
864
        // If we have no receivers, cleanup the entry from our map.
865
4
        if value.receiver_count() == 0 {
  Branch (865:12): [True: 4, False: 0]
  Branch (865:12): [Folded - Ignored]
866
4
            subscribed_keys.remove(key);
867
4
        
}0
868
6
    }
869
}
870
871
/// A publisher for a key in Redis.
872
#[derive(Debug)]
873
struct RedisSubscriptionPublisher {
874
    sender: Mutex<tokio::sync::watch::Sender<String>>,
875
}
876
877
impl RedisSubscriptionPublisher {
878
6
    fn new(
879
6
        key: String,
880
6
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
881
6
    ) -> (Self, RedisSubscription) {
882
6
        let (sender, receiver) = tokio::sync::watch::channel(key);
883
6
        let publisher = Self {
884
6
            sender: Mutex::new(sender),
885
6
        };
886
6
        let subscription = RedisSubscription {
887
6
            receiver: Some(receiver),
888
6
            weak_subscribed_keys,
889
6
        };
890
6
        (publisher, subscription)
891
6
    }
892
893
0
    fn subscribe(
894
0
        &self,
895
0
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
896
0
    ) -> RedisSubscription {
897
0
        let receiver = self.sender.lock().subscribe();
898
0
        RedisSubscription {
899
0
            receiver: Some(receiver),
900
0
            weak_subscribed_keys,
901
0
        }
902
0
    }
903
904
4
    fn receiver_count(&self) -> usize {
905
4
        self.sender.lock().receiver_count()
906
4
    }
907
908
22
    fn notify(&self) {
909
        // TODO(https://github.com/sile/patricia_tree/issues/40) When this is addressed
910
        // we can remove the `Mutex` and use the mutable iterator directly.
911
22
        self.sender.lock().send_modify(|_| {});
912
22
    }
913
}
914
915
#[derive(Debug)]
916
pub struct RedisSubscriptionManager {
917
    subscribed_keys: Arc<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
918
    tx_for_test: tokio::sync::mpsc::UnboundedSender<String>,
919
    _subscription_spawn: JoinHandleDropGuard<()>,
920
}
921
922
impl RedisSubscriptionManager {
923
4
    pub fn new(subscribe_client: SubscriberClient, pub_sub_channel: String) -> Self {
924
4
        let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new()));
925
4
        let subscribed_keys_weak = Arc::downgrade(&subscribed_keys);
926
4
        let (tx_for_test, mut rx_for_test) = tokio::sync::mpsc::unbounded_channel();
927
        Self {
928
4
            subscribed_keys,
929
4
            tx_for_test,
930
4
            _subscription_spawn: spawn!("redis_subscribe_spawn", async move {
931
4
                let mut rx = subscribe_client.message_rx();
932
                loop {
933
4
                    if let Err(
e0
) = subscribe_client.subscribe(&pub_sub_channel).await {
  Branch (933:28): [True: 0, False: 4]
  Branch (933:28): [Folded - Ignored]
934
0
                        error!("Error subscribing to pattern - {e}");
935
0
                        return;
936
4
                    }
937
4
                    let mut reconnect_rx = subscribe_client.reconnect_rx();
938
4
                    let reconnect_fut = reconnect_rx.recv().fuse();
939
4
                    tokio::pin!(reconnect_fut);
940
                    loop {
941
21
                        let 
key17
= select! {
942
21
                            
value17
= rx_for_test.recv() => {
943
17
                                let Some(value) = value else {
  Branch (943:37): [True: 17, False: 0]
  Branch (943:37): [Folded - Ignored]
944
0
                                    unreachable!("Channel should never close");
945
                                };
946
17
                                value.into()
947
                            },
948
21
                            
msg0
= rx.recv() => {
949
0
                                match msg {
950
0
                                    Ok(msg) => {
951
0
                                        if let RedisValue::String(s) = msg.value {
  Branch (951:48): [True: 0, False: 0]
  Branch (951:48): [Folded - Ignored]
952
0
                                            s
953
                                        } else {
954
0
                                            error!("Received non-string message in RedisSubscriptionManager");
955
0
                                            continue;
956
                                        }
957
                                    },
958
0
                                    Err(e) => {
959
                                        // Check to see if our parent has been dropped and if so kill spawn.
960
0
                                        if subscribed_keys_weak.upgrade().is_none() {
  Branch (960:44): [True: 0, False: 0]
  Branch (960:44): [Folded - Ignored]
961
0
                                            warn!("It appears our parent has been dropped, exiting RedisSubscriptionManager spawn");
962
0
                                            return;
963
0
                                        }
964
0
                                        error!("Error receiving message in RedisSubscriptionManager reconnecting and flagging everything changed - {e}");
965
0
                                        break;
966
                                    }
967
                                }
968
                            },
969
21
                            _ = &mut reconnect_fut => {
970
0
                                warn!("Redis reconnected flagging all subscriptions as changed and resuming");
971
0
                                break;
972
                            }
973
                        };
974
17
                        let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
  Branch (974:29): [True: 17, False: 0]
  Branch (974:29): [Folded - Ignored]
975
0
                            warn!(
976
0
                                "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"
977
                            );
978
0
                            return;
979
                        };
980
17
                        let subscribed_keys_mux = subscribed_keys.read();
981
17
                        subscribed_keys_mux
982
17
                            .common_prefix_values(&*key)
983
17
                            .for_each(RedisSubscriptionPublisher::notify);
984
                    }
985
                    // Sleep for a small amount of time to ensure we don't reconnect too quickly.
986
0
                    sleep(Duration::from_secs(1)).await;
987
                    // If we reconnect or lag behind we might have had dirty keys, so we need to
988
                    // flag all of them as changed.
989
0
                    let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
  Branch (989:25): [True: 0, False: 0]
  Branch (989:25): [Folded - Ignored]
990
0
                        warn!(
991
0
                            "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"
992
                        );
993
0
                        return;
994
                    };
995
0
                    let subscribed_keys_mux = subscribed_keys.read();
996
                    // Just in case also get a new receiver.
997
0
                    rx = subscribe_client.message_rx();
998
                    // Drop all buffered messages, then flag everything as changed.
999
0
                    rx.resubscribe();
1000
0
                    for publisher in subscribed_keys_mux.values() {
1001
0
                        publisher.notify();
1002
0
                    }
1003
                }
1004
0
            }),
1005
        }
1006
4
    }
1007
}
1008
1009
impl SchedulerSubscriptionManager for RedisSubscriptionManager {
1010
    type Subscription = RedisSubscription;
1011
1012
17
    fn notify_for_test(&self, value: String) {
1013
17
        self.tx_for_test.send(value).unwrap();
1014
17
    }
1015
1016
6
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
1017
6
    where
1018
6
        K: SchedulerStoreKeyProvider,
1019
    {
1020
6
        let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys);
1021
6
        let mut subscribed_keys = self.subscribed_keys.write();
1022
6
        let key = key.get_key();
1023
6
        let key_str = key.as_str();
1024
6
        let mut subscription = if let Some(
publisher0
) = subscribed_keys.get(&key_str) {
  Branch (1024:39): [True: 0, False: 0]
  Branch (1024:39): [Folded - Ignored]
  Branch (1024:39): [Folded - Ignored]
  Branch (1024:39): [True: 0, False: 6]
1025
0
            publisher.subscribe(weak_subscribed_keys)
1026
        } else {
1027
6
            let (publisher, subscription) =
1028
6
                RedisSubscriptionPublisher::new(key_str.to_string(), weak_subscribed_keys);
1029
6
            subscribed_keys.insert(key_str, publisher);
1030
6
            subscription
1031
        };
1032
6
        subscription
1033
6
            .receiver
1034
6
            .as_mut()
1035
6
            .ok_or_else(|| 
{0
1036
0
                make_err!(
1037
0
                    Code::Internal,
1038
                    "Receiver should be set in RedisSubscriptionManager::subscribe"
1039
                )
1040
0
            })?
1041
6
            .mark_changed();
1042
1043
6
        Ok(subscription)
1044
6
    }
1045
1046
2
    fn is_reliable() -> bool {
1047
2
        false
1048
2
    }
1049
}
1050
1051
impl SchedulerStore for RedisStore {
1052
    type SubscriptionManager = RedisSubscriptionManager;
1053
1054
9
    fn subscription_manager(&self) -> Result<Arc<RedisSubscriptionManager>, Error> {
1055
9
        let mut subscription_manager = self.subscription_manager.lock();
1056
1057
9
        if let Some(
subscription_manager5
) = &*subscription_manager {
  Branch (1057:16): [True: 5, False: 4]
  Branch (1057:16): [Folded - Ignored]
1058
5
            Ok(subscription_manager.clone())
1059
        } else {
1060
4
            let Some(pub_sub_channel) = &self.pub_sub_channel else {
  Branch (1060:17): [True: 4, False: 0]
  Branch (1060:17): [Folded - Ignored]
1061
0
                return Err(make_input_err!(
1062
0
                    "RedisStore must have a pubsub channel for a Redis Scheduler if using subscriptions"
1063
0
                ));
1064
            };
1065
4
            let sub = Arc::new(RedisSubscriptionManager::new(
1066
4
                self.subscriber_client.clone(),
1067
4
                pub_sub_channel.clone(),
1068
            ));
1069
4
            *subscription_manager = Some(sub.clone());
1070
4
            Ok(sub)
1071
        }
1072
9
    }
1073
1074
19
    async fn update_data<T>(&self, data: T) -> Result<Option<i64>, Error>
1075
19
    where
1076
19
        T: SchedulerStoreDataProvider
1077
19
            + SchedulerStoreKeyProvider
1078
19
            + SchedulerCurrentVersionProvider
1079
19
            + Send,
1080
19
    {
1081
19
        let key = data.get_key();
1082
19
        let key = self.encode_key(&key);
1083
19
        let client = self.get_client().await
?0
;
1084
19
        let maybe_index = data.get_indexes().err_tip(|| 
{0
1085
0
            format!("Err getting index in RedisStore::update_data::versioned for {key:?}")
1086
0
        })?;
1087
19
        if <T as SchedulerStoreKeyProvider>::Versioned::VALUE {
  Branch (1087:12): [True: 0, False: 0]
  Branch (1087:12): [True: 0, False: 0]
  Branch (1087:12): [Folded - Ignored]
  Branch (1087:12): [Folded - Ignored]
  Branch (1087:12): [True: 0, False: 4]
  Branch (1087:12): [True: 15, False: 0]
1088
15
            let current_version = data.current_version();
1089
15
            let data = data.try_into_bytes().err_tip(|| 
{0
1090
0
                format!("Could not convert value to bytes in RedisStore::update_data::versioned for {key:?}")
1091
0
            })?;
1092
15
            let mut argv = Vec::with_capacity(3 + maybe_index.len() * 2);
1093
15
            argv.push(Bytes::from(format!("{current_version}")));
1094
15
            argv.push(data);
1095
60
            for (
name45
,
value45
) in maybe_index {
1096
45
                argv.push(Bytes::from_static(name.as_bytes()));
1097
45
                argv.push(value);
1098
45
            }
1099
15
            let (success, new_version): (bool, i64) = self
1100
15
                .update_if_version_matches_script
1101
15
                .evalsha_with_reload(client, vec![key.as_ref()], argv)
1102
15
                .await
1103
15
                .err_tip(|| format!(
"In RedisStore::update_data::versioned for {key:?}"0
))
?0
;
1104
15
            if !success {
  Branch (1104:16): [True: 0, False: 0]
  Branch (1104:16): [True: 0, False: 0]
  Branch (1104:16): [Folded - Ignored]
  Branch (1104:16): [Folded - Ignored]
  Branch (1104:16): [True: 0, False: 0]
  Branch (1104:16): [True: 1, False: 14]
1105
1
                tracing::info!(
1106
1
                    "Error updating Redis key {key} expected version {current_version} but found {new_version}"
1107
                );
1108
1
                return Ok(None);
1109
14
            }
1110
            // If we have a publish channel configured, send a notice that the key has been set.
1111
14
            if let Some(pub_sub_channel) = &self.pub_sub_channel {
  Branch (1111:20): [True: 0, False: 0]
  Branch (1111:20): [True: 0, False: 0]
  Branch (1111:20): [Folded - Ignored]
  Branch (1111:20): [Folded - Ignored]
  Branch (1111:20): [True: 0, False: 0]
  Branch (1111:20): [True: 14, False: 0]
1112
14
                return Ok(client.publish(pub_sub_channel, key.as_ref()).await
?0
);
1113
0
            }
1114
0
            Ok(Some(new_version))
1115
        } else {
1116
4
            let data = data.try_into_bytes().err_tip(|| 
{0
1117
0
                format!("Could not convert value to bytes in RedisStore::update_data::noversion for {key:?}")
1118
0
            })?;
1119
4
            let mut fields = RedisMap::new();
1120
4
            fields.reserve(1 + maybe_index.len());
1121
4
            fields.insert(DATA_FIELD_NAME.into(), data.into());
1122
4
            for (
name0
,
value0
) in maybe_index {
1123
0
                fields.insert(name.into(), value.into());
1124
0
            }
1125
4
            client
1126
4
                .hset::<(), _, _>(key.as_ref(), fields)
1127
4
                .await
1128
4
                .err_tip(|| format!(
"In RedisStore::update_data::noversion for {key:?}"0
))
?0
;
1129
            // If we have a publish channel configured, send a notice that the key has been set.
1130
4
            if let Some(pub_sub_channel) = &self.pub_sub_channel {
  Branch (1130:20): [True: 0, False: 0]
  Branch (1130:20): [True: 0, False: 0]
  Branch (1130:20): [Folded - Ignored]
  Branch (1130:20): [Folded - Ignored]
  Branch (1130:20): [True: 4, False: 0]
  Branch (1130:20): [True: 0, False: 0]
1131
4
                return Ok(client.publish(pub_sub_channel, key.as_ref()).await
?0
);
1132
0
            }
1133
0
            Ok(Some(0)) // Always use "0" version since this is not a versioned request.
1134
        }
1135
19
    }
1136
1137
21
    async fn search_by_index_prefix<K>(
1138
21
        &self,
1139
21
        index: K,
1140
21
    ) -> Result<
1141
21
        impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
1142
21
        Error,
1143
21
    >
1144
21
    where
1145
21
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,
1146
21
    {
1147
21
        let index_value = index.index_value();
1148
22
        let 
run_ft_aggregate21
= || {
1149
22
            let client = self.client_pool.next().clone();
1150
22
            let sanitized_field = try_sanitize(index_value.as_ref()).err_tip(|| 
{0
1151
0
                format!("In RedisStore::search_by_index_prefix::try_sanitize - {index_value:?}")
1152
0
            })?;
1153
22
            Ok::<_, Error>(async move {
1154
22
                ft_aggregate(
1155
22
                    client,
1156
22
                    format!(
1157
22
                        "{}",
1158
22
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1159
                    ),
1160
22
                    format!("@{}:{{ {} }}", K::INDEX_NAME, sanitized_field),
1161
                    FtAggregateOptions {
1162
22
                        load: Some(Load::Some(vec![
1163
22
                            SearchField {
1164
22
                                identifier: DATA_FIELD_NAME.into(),
1165
22
                                property: None,
1166
22
                            },
1167
22
                            SearchField {
1168
22
                                identifier: VERSION_FIELD_NAME.into(),
1169
22
                                property: None,
1170
22
                            },
1171
22
                        ])),
1172
22
                        cursor: Some(WithCursor {
1173
22
                            count: Some(MAX_COUNT_PER_CURSOR),
1174
22
                            max_idle: Some(CURSOR_IDLE_MS),
1175
22
                        }),
1176
22
                        pipeline: vec![AggregateOperation::SortBy {
1177
22
                            properties: K::MAYBE_SORT_KEY.map_or_else(Vec::new, |v| 
{17
1178
17
                                vec![(format!("@{v}").into(), SortOrder::Asc)]
1179
17
                            }),
1180
22
                            max: None,
1181
                        }],
1182
22
                        ..Default::default()
1183
                    },
1184
                )
1185
22
                .await
1186
22
            })
1187
22
        };
1188
21
        let stream = run_ft_aggregate()
?0
1189
21
            .or_else(|_| async move 
{1
1190
1
                let mut schema = vec![SearchSchema {
1191
1
                    field_name: K::INDEX_NAME.into(),
1192
1
                    alias: None,
1193
1
                    kind: SearchSchemaKind::Tag {
1194
1
                        sortable: false,
1195
1
                        unf: false,
1196
1
                        separator: None,
1197
1
                        casesensitive: false,
1198
1
                        withsuffixtrie: false,
1199
1
                        noindex: false,
1200
1
                    },
1201
1
                }];
1202
1
                if let Some(
sort_key0
) = K::MAYBE_SORT_KEY {
  Branch (1202:24): [True: 0, False: 0]
  Branch (1202:24): [True: 0, False: 0]
  Branch (1202:24): [Folded - Ignored]
  Branch (1202:24): [Folded - Ignored]
  Branch (1202:24): [True: 0, False: 0]
  Branch (1202:24): [True: 0, False: 1]
1203
0
                    schema.push(SearchSchema {
1204
0
                        field_name: sort_key.into(),
1205
0
                        alias: None,
1206
0
                        kind: SearchSchemaKind::Tag {
1207
0
                            sortable: true,
1208
0
                            unf: false,
1209
0
                            separator: None,
1210
0
                            casesensitive: false,
1211
0
                            withsuffixtrie: false,
1212
0
                            noindex: false,
1213
0
                        },
1214
0
                    });
1215
1
                }
1216
1
                let create_result = self
1217
1
                    .client_pool
1218
1
                    .next()
1219
1
                    .ft_create::<(), _>(
1220
1
                        format!(
1221
1
                            "{}",
1222
1
                            get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1223
1
                        ),
1224
1
                        FtCreateOptions {
1225
1
                            on: Some(IndexKind::Hash),
1226
1
                            prefixes: vec![K::KEY_PREFIX.into()],
1227
1
                            nohl: true,
1228
1
                            nofields: true,
1229
1
                            nofreqs: true,
1230
1
                            nooffsets: true,
1231
1
                            temporary: Some(INDEX_TTL_S),
1232
1
                            ..Default::default()
1233
1
                        },
1234
1
                        schema,
1235
1
                    )
1236
1
                    .await
1237
1
                    .err_tip(|| 
{0
1238
0
                        format!(
1239
0
                            "Error with ft_create in RedisStore::search_by_index_prefix({})",
1240
0
                            get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1241
                        )
1242
0
                    });
1243
1
                let run_result = run_ft_aggregate()
?0
.await.err_tip(||
{0
1244
0
                    format!(
1245
0
                        "Error with second ft_aggregate in RedisStore::search_by_index_prefix({})",
1246
0
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1247
                    )
1248
0
                });
1249
                // Creating the index will race which is ok. If it fails to create, we only
1250
                // error if the second ft_aggregate call fails and fails to create.
1251
1
                run_result.or_else(move |e| 
create_result0
.
merge0
(
Err(e)0
))
1252
2
            })
1253
21
            .await
?0
;
1254
21
        Ok(stream.map(|result| 
{7
1255
7
            let mut redis_map =
1256
7
                result.err_tip(|| "Error in stream of in RedisStore::search_by_index_prefix")
?0
;
1257
7
            let bytes_data = redis_map
1258
7
                .remove(&RedisKey::from_static_str(DATA_FIELD_NAME))
1259
7
                .err_tip(|| "Missing data field in RedisStore::search_by_index_prefix")
?0
1260
7
                .into_bytes()
1261
7
                .err_tip(|| 
{0
1262
0
                    formatcp!("'{DATA_FIELD_NAME}' is not Bytes in RedisStore::search_by_index_prefix::into_bytes")
1263
0
                })?;
1264
7
            let version = if <K as SchedulerIndexProvider>::Versioned::VALUE {
  Branch (1264:30): [True: 0, False: 0]
  Branch (1264:30): [True: 0, False: 0]
  Branch (1264:30): [Folded - Ignored]
  Branch (1264:30): [Folded - Ignored]
  Branch (1264:30): [True: 5, False: 0]
  Branch (1264:30): [True: 2, False: 0]
1265
7
                redis_map
1266
7
                    .remove(&RedisKey::from_static_str(VERSION_FIELD_NAME))
1267
7
                    .err_tip(|| "Missing version field in RedisStore::search_by_index_prefix")
?0
1268
7
                    .as_i64()
1269
7
                    .err_tip(|| 
{0
1270
0
                        formatcp!("'{VERSION_FIELD_NAME}' is not u64 in RedisStore::search_by_index_prefix::as_u64")
1271
0
                    })?
1272
            } else {
1273
0
                0
1274
            };
1275
7
            K::decode(version, bytes_data)
1276
7
                .err_tip(|| "In RedisStore::search_by_index_prefix::decode")
1277
7
        }))
1278
21
    }
1279
1280
38
    async fn get_and_decode<K>(
1281
38
        &self,
1282
38
        key: K,
1283
38
    ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>
1284
38
    where
1285
38
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send,
1286
38
    {
1287
38
        let key = key.get_key();
1288
38
        let key = self.encode_key(&key);
1289
38
        let client = self.get_client().await
?0
;
1290
38
        let (
maybe_version37
,
maybe_data37
) = client
1291
38
            .hmget::<(Option<i64>, Option<Bytes>), _, _>(
1292
38
                key.as_ref(),
1293
38
                vec![
1294
38
                    RedisKey::from(VERSION_FIELD_NAME),
1295
38
                    RedisKey::from(DATA_FIELD_NAME),
1296
38
                ],
1297
38
            )
1298
38
            .await
1299
38
            .err_tip(|| format!(
"In RedisStore::get_without_version::notversioned {key}"1
))
?1
;
1300
37
        let Some(data) = maybe_data else {
  Branch (1300:13): [True: 0, False: 0]
  Branch (1300:13): [True: 0, False: 0]
  Branch (1300:13): [Folded - Ignored]
  Branch (1300:13): [Folded - Ignored]
  Branch (1300:13): [True: 3, False: 0]
  Branch (1300:13): [True: 34, False: 0]
1301
0
            return Ok(None);
1302
        };
1303
37
        Ok(Some(K::decode(maybe_version.unwrap_or(0), data).err_tip(
1304
0
            || format!("In RedisStore::get_with_version::notversioned::decode {key}"),
1305
0
        )?))
1306
38
    }
1307
}