Coverage Report

Created: 2025-07-30 16:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/redis_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::ops::{Bound, RangeBounds};
17
use core::pin::Pin;
18
use core::time::Duration;
19
use std::borrow::Cow;
20
use std::sync::{Arc, Weak};
21
22
use async_trait::async_trait;
23
use bytes::Bytes;
24
use const_format::formatcp;
25
use fred::clients::{Pool as RedisPool, SubscriberClient};
26
use fred::interfaces::{ClientLike, KeysInterface, PubsubInterface};
27
use fred::prelude::{Client, EventInterface, HashesInterface, RediSearchInterface};
28
use fred::types::config::{
29
    Config as RedisConfig, ConnectionConfig, PerformanceConfig, ReconnectPolicy, UnresponsiveConfig,
30
};
31
use fred::types::redisearch::{
32
    AggregateOperation, FtAggregateOptions, FtCreateOptions, IndexKind, Load, SearchField,
33
    SearchSchema, SearchSchemaKind, WithCursor,
34
};
35
use fred::types::scan::Scanner;
36
use fred::types::scripts::Script;
37
use fred::types::{Builder, Key as RedisKey, Map as RedisMap, SortOrder, Value as RedisValue};
38
use futures::stream::FuturesUnordered;
39
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future};
40
use nativelink_config::stores::{RedisMode, RedisSpec};
41
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
42
use nativelink_metric::MetricsComponent;
43
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
44
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
45
use nativelink_util::spawn;
46
use nativelink_util::store_trait::{
47
    BoolValue, SchedulerCurrentVersionProvider, SchedulerIndexProvider, SchedulerStore,
48
    SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,
49
    SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo,
50
};
51
use nativelink_util::task::JoinHandleDropGuard;
52
use parking_lot::{Mutex, RwLock};
53
use patricia_tree::StringPatriciaMap;
54
use tokio::select;
55
use tokio::time::sleep;
56
use tracing::{error, info, warn};
57
use uuid::Uuid;
58
59
use crate::cas_utils::is_zero_digest;
60
use crate::redis_utils::ft_aggregate;
61
62
/// The default size of the read chunk when reading data from Redis.
63
/// Note: If this changes it should be updated in the config documentation.
64
const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024;
65
66
/// The default size of the connection pool if not specified.
67
/// Note: If this changes it should be updated in the config documentation.
68
const DEFAULT_CONNECTION_POOL_SIZE: usize = 3;
69
70
/// The default delay between retries if not specified.
71
/// Note: If this changes it should be updated in the config documentation.
72
const DEFAULT_RETRY_DELAY: f32 = 0.1;
73
/// The amount of jitter to add to the retry delay if not specified.
74
/// Note: If this changes it should be updated in the config documentation.
75
const DEFAULT_RETRY_JITTER: f32 = 0.5;
76
77
/// The default maximum capacity of the broadcast channel if not specified.
78
/// Note: If this changes it should be updated in the config documentation.
79
const DEFAULT_BROADCAST_CHANNEL_CAPACITY: usize = 4096;
80
81
/// The default connection timeout in milliseconds if not specified.
82
/// Note: If this changes it should be updated in the config documentation.
83
const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000;
84
85
/// The default command timeout in milliseconds if not specified.
86
/// Note: If this changes it should be updated in the config documentation.
87
const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000;
88
89
/// The default maximum number of chunk uploads per update.
90
/// Note: If this changes it should be updated in the config documentation.
91
const DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE: usize = 10;
92
93
/// The default COUNT value passed when scanning keys in Redis.
94
/// Note: If this changes it should be updated in the config documentation.
95
const DEFAULT_SCAN_COUNT: u32 = 10_000;
96
97
/// A [`StoreDriver`] implementation that uses Redis as a backing store.
98
#[derive(Debug, MetricsComponent)]
99
pub struct RedisStore {
100
    /// The client pool connecting to the backing Redis instance(s).
101
    client_pool: RedisPool,
102
103
    /// A channel to publish updates to when a key is added, removed, or modified.
104
    #[metric(
105
        help = "The pubsub channel to publish updates to when a key is added, removed, or modified"
106
    )]
107
    pub_sub_channel: Option<String>,
108
109
    /// A redis client for managing subscriptions.
110
    /// TODO: This should be moved into the store in followups once a standard use pattern has been determined.
111
    subscriber_client: SubscriberClient,
112
113
    /// A function used to generate names for temporary keys.
114
    temp_name_generator_fn: fn() -> String,
115
116
    /// A common prefix to append to all keys before they are sent to Redis.
117
    ///
118
    /// See [`RedisStore::key_prefix`](`nativelink_config::stores::RedisStore::key_prefix`).
119
    #[metric(help = "Prefix to append to all keys before sending to Redis")]
120
    key_prefix: String,
121
122
    /// The amount of data to read from Redis at a time.
123
    #[metric(help = "The amount of data to read from Redis at a time")]
124
    read_chunk_size: usize,
125
126
    /// The maximum number of chunk uploads per update.
127
    /// This is used to limit the number of chunk uploads per update to prevent
128
    #[metric(help = "The maximum number of chunk uploads per update")]
129
    max_chunk_uploads_per_update: usize,
130
131
    /// The COUNT value passed when scanning keys in Redis.
132
    /// This is used to hint the amount of work that should be done per response.
133
    #[metric(help = "The COUNT value passed when scanning keys in Redis")]
134
    scan_count: u32,
135
136
    /// Redis script used to update a value in redis if the version matches.
137
    /// This is done by incrementing the version number and then setting the new data
138
    /// only if the version number matches the existing version number.
139
    update_if_version_matches_script: Script,
140
141
    /// A manager for subscriptions to keys in Redis.
142
    subscription_manager: Mutex<Option<Arc<RedisSubscriptionManager>>>,
143
}
144
145
impl RedisStore {
146
    /// Create a new `RedisStore` from the given configuration.
147
1
    pub fn new(mut spec: RedisSpec) -> Result<Arc<Self>, Error> {
148
1
        if spec.addresses.is_empty() {
  Branch (148:12): [True: 0, False: 1]
  Branch (148:12): [Folded - Ignored]
149
0
            return Err(make_err!(
150
0
                Code::InvalidArgument,
151
0
                "No addresses were specified in redis store configuration."
152
0
            ));
153
1
        }
154
1
        let [addr] = spec.addresses.as_slice() else {
  Branch (154:13): [True: 1, False: 0]
  Branch (154:13): [Folded - Ignored]
155
0
            return Err(make_err!(
156
0
                Code::Unimplemented,
157
0
                "Connecting directly to multiple redis nodes in a cluster is currently unsupported. Please specify a single URL to a single node, and nativelink will use cluster discover to find the other nodes."
158
0
            ));
159
        };
160
1
        let redis_config = match spec.mode {
161
0
            RedisMode::Cluster => RedisConfig::from_url_clustered(addr),
162
0
            RedisMode::Sentinel => RedisConfig::from_url_sentinel(addr),
163
1
            RedisMode::Standard => RedisConfig::from_url_centralized(addr),
164
        }
165
1
        .err_tip_with_code(|e| 
{0
166
0
            (
167
0
                Code::InvalidArgument,
168
0
                format!("while parsing redis node address: {e}"),
169
0
            )
170
0
        })?;
171
172
1
        let reconnect_policy = {
173
1
            if spec.retry.delay == 0.0 {
  Branch (173:16): [True: 1, False: 0]
  Branch (173:16): [Folded - Ignored]
174
1
                spec.retry.delay = DEFAULT_RETRY_DELAY;
175
1
            
}0
176
1
            if spec.retry.jitter == 0.0 {
  Branch (176:16): [True: 1, False: 0]
  Branch (176:16): [Folded - Ignored]
177
1
                spec.retry.jitter = DEFAULT_RETRY_JITTER;
178
1
            
}0
179
180
1
            let max_retries = u32::try_from(spec.retry.max_retries)
181
1
                .err_tip(|| "max_retries could not be converted to u32 in RedisStore::new")
?0
;
182
1
            let min_delay_ms = (spec.retry.delay * 1000.0) as u32;
183
1
            let max_delay_ms = 8000;
184
1
            let jitter = (spec.retry.jitter * spec.retry.delay * 1000.0) as u32;
185
186
1
            let mut reconnect_policy = ReconnectPolicy::new_exponential(
187
1
                max_retries,  /* max_retries, 0 is unlimited */
188
1
                min_delay_ms, /* min_delay */
189
1
                max_delay_ms, /* max_delay */
190
                2,            /* mult */
191
            );
192
1
            reconnect_policy.set_jitter(jitter);
193
1
            reconnect_policy
194
        };
195
196
        {
197
1
            if spec.broadcast_channel_capacity == 0 {
  Branch (197:16): [True: 1, False: 0]
  Branch (197:16): [Folded - Ignored]
198
1
                spec.broadcast_channel_capacity = DEFAULT_BROADCAST_CHANNEL_CAPACITY;
199
1
            
}0
200
1
            if spec.connection_timeout_ms == 0 {
  Branch (200:16): [True: 1, False: 0]
  Branch (200:16): [Folded - Ignored]
201
1
                spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS;
202
1
            
}0
203
1
            if spec.command_timeout_ms == 0 {
  Branch (203:16): [True: 1, False: 0]
  Branch (203:16): [Folded - Ignored]
204
1
                spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS;
205
1
            
}0
206
1
            if spec.connection_pool_size == 0 {
  Branch (206:16): [True: 1, False: 0]
  Branch (206:16): [Folded - Ignored]
207
1
                spec.connection_pool_size = DEFAULT_CONNECTION_POOL_SIZE;
208
1
            
}0
209
1
            if spec.read_chunk_size == 0 {
  Branch (209:16): [True: 1, False: 0]
  Branch (209:16): [Folded - Ignored]
210
1
                spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
211
1
            
}0
212
1
            if spec.max_chunk_uploads_per_update == 0 {
  Branch (212:16): [True: 1, False: 0]
  Branch (212:16): [Folded - Ignored]
213
1
                spec.max_chunk_uploads_per_update = DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE;
214
1
            
}0
215
1
            if spec.scan_count == 0 {
  Branch (215:16): [True: 1, False: 0]
  Branch (215:16): [Folded - Ignored]
216
1
                spec.scan_count = DEFAULT_SCAN_COUNT;
217
1
            
}0
218
        }
219
1
        let connection_timeout = Duration::from_millis(spec.connection_timeout_ms);
220
1
        let command_timeout = Duration::from_millis(spec.command_timeout_ms);
221
222
1
        let mut builder = Builder::from_config(redis_config);
223
1
        builder
224
1
            .set_performance_config(PerformanceConfig {
225
1
                default_command_timeout: command_timeout,
226
1
                broadcast_channel_capacity: spec.broadcast_channel_capacity,
227
1
                ..Default::default()
228
1
            })
229
1
            .set_connection_config(ConnectionConfig {
230
1
                connection_timeout,
231
1
                internal_command_timeout: command_timeout,
232
1
                unresponsive: UnresponsiveConfig {
233
1
                    max_timeout: Some(connection_timeout),
234
1
                    // This number needs to be less than the connection timeout.
235
1
                    // We use 4 as it is a good balance between not spamming the server
236
1
                    // and not waiting too long.
237
1
                    interval: connection_timeout / 4,
238
1
                },
239
1
                ..Default::default()
240
1
            })
241
1
            .set_policy(reconnect_policy);
242
243
1
        let client_pool = builder
244
1
            .build_pool(spec.connection_pool_size)
245
1
            .err_tip(|| "while creating redis connection pool")
?0
;
246
247
1
        let subscriber_client = builder
248
1
            .build_subscriber_client()
249
1
            .err_tip(|| "while creating redis subscriber client")
?0
;
250
251
1
        Self::new_from_builder_and_parts(
252
1
            client_pool,
253
1
            subscriber_client,
254
1
            spec.experimental_pub_sub_channel.clone(),
255
0
            || Uuid::new_v4().to_string(),
256
1
            spec.key_prefix.clone(),
257
1
            spec.read_chunk_size,
258
1
            spec.max_chunk_uploads_per_update,
259
1
            spec.scan_count,
260
        )
261
1
        .map(Arc::new)
262
1
    }
263
264
    /// Used for testing when determinism is required.
265
    #[expect(clippy::too_many_arguments)]
266
12
    pub fn new_from_builder_and_parts(
267
12
        client_pool: RedisPool,
268
12
        subscriber_client: SubscriberClient,
269
12
        pub_sub_channel: Option<String>,
270
12
        temp_name_generator_fn: fn() -> String,
271
12
        key_prefix: String,
272
12
        read_chunk_size: usize,
273
12
        max_chunk_uploads_per_update: usize,
274
12
        scan_count: u32,
275
12
    ) -> Result<Self, Error> {
276
        // Start connection pool (this will retry forever by default).
277
12
        client_pool.connect();
278
12
        subscriber_client.connect();
279
280
12
        info!("Redis index fingerprint: {FINGERPRINT_CREATE_INDEX_HEX}");
281
282
12
        Ok(Self {
283
12
            client_pool,
284
12
            pub_sub_channel,
285
12
            subscriber_client,
286
12
            temp_name_generator_fn,
287
12
            key_prefix,
288
12
            read_chunk_size,
289
12
            max_chunk_uploads_per_update,
290
12
            scan_count,
291
12
            update_if_version_matches_script: Script::from_lua(LUA_VERSION_SET_SCRIPT),
292
12
            subscription_manager: Mutex::new(None),
293
12
        })
294
12
    }
295
296
38
    async fn get_client(&'_ self) -> Result<&'_ Client, Error> {
297
38
        let client = self.client_pool.next();
298
38
        let config = client.client_config();
299
38
        if config.mocks.is_none() {
  Branch (299:12): [True: 0, False: 0]
  Branch (299:12): [True: 1, False: 24]
  Branch (299:12): [Folded - Ignored]
  Branch (299:12): [True: 0, False: 13]
300
1
            client.wait_for_connect().await.err_tip(||
301
                format!(
302
1
                    "Connection issue connecting to redis server with hosts: {:?}, username: {}, database: {}",
303
1
                    config.server.hosts().iter().map(|s| format!("{}:{}", s.host, s.port)).collect::<Vec<String>>(),
304
1
                    config.username.unwrap_or_else(|| "None".to_string()),
305
1
                    config.database.unwrap_or_default()
306
                )
307
1
            )?;
308
37
        }
309
37
        Ok(client)
310
38
    }
311
312
    /// Encode a [`StoreKey`] so it can be sent to Redis.
313
29
    fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> {
314
29
        let key_body = key.as_str();
315
29
        if self.key_prefix.is_empty() {
  Branch (315:12): [True: 25, False: 4]
  Branch (315:12): [Folded - Ignored]
316
25
            key_body
317
        } else {
318
            // This is in the hot path for all redis operations, so we try to reuse the allocation
319
            // from `key.as_str()` if possible.
320
4
            match key_body {
321
4
                Cow::Owned(mut encoded_key) => {
322
4
                    encoded_key.insert_str(0, &self.key_prefix);
323
4
                    Cow::Owned(encoded_key)
324
                }
325
0
                Cow::Borrowed(body) => {
326
0
                    let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len());
327
0
                    encoded_key.push_str(&self.key_prefix);
328
0
                    encoded_key.push_str(body);
329
0
                    Cow::Owned(encoded_key)
330
                }
331
            }
332
        }
333
29
    }
334
}
335
336
#[async_trait]
337
impl StoreDriver for RedisStore {
338
    async fn has_with_results(
339
        self: Pin<&Self>,
340
        keys: &[StoreKey<'_>],
341
        results: &mut [Option<u64>],
342
14
    ) -> Result<(), Error> {
343
        // TODO(aaronmondal) We could use pipeline here, but it makes retry more
344
        // difficult and it doesn't work very well in cluster mode.
345
        // If we wanted to optimize this with pipeline be careful to
346
        // implement retry and to support cluster mode.
347
7
        let 
client6
= self.get_client().await
?1
;
348
6
        keys.iter()
349
6
            .zip(results.iter_mut())
350
6
            .map(|(key, result)| async move {
351
                // We need to do a special pass to ensure our zero key exist.
352
6
                if is_zero_digest(key.borrow()) {
  Branch (352:20): [True: 2, False: 4]
  Branch (352:20): [Folded - Ignored]
353
2
                    *result = Some(0);
354
2
                    return Ok::<_, Error>(());
355
4
                }
356
4
                let encoded_key = self.encode_key(key);
357
4
                let pipeline = client.pipeline();
358
4
                pipeline
359
4
                    .strlen::<(), _>(encoded_key.as_ref())
360
4
                    .await
361
4
                    .err_tip(|| 
{0
362
0
                        format!("In RedisStore::has_with_results::strlen for {encoded_key}")
363
0
                    })?;
364
                // Redis returns 0 when the key doesn't exist
365
                // AND when the key exists with value of length 0.
366
                // Therefore, we need to check both length and existence
367
                // and do it in a pipeline for efficiency.
368
4
                pipeline
369
4
                    .exists::<(), _>(encoded_key.as_ref())
370
4
                    .await
371
4
                    .err_tip(|| 
{0
372
0
                        format!("In RedisStore::has_with_results::exists for {encoded_key}")
373
0
                    })?;
374
4
                let (blob_len, exists) = pipeline
375
4
                    .all::<(u64, bool)>()
376
4
                    .await
377
4
                    .err_tip(|| "In RedisStore::has_with_results::query")
?0
;
378
379
4
                *result = if exists { Some(blob_len) } else { 
None0
};
  Branch (379:30): [True: 4, False: 0]
  Branch (379:30): [Folded - Ignored]
380
381
4
                Ok::<_, Error>(())
382
12
            })
383
6
            .collect::<FuturesUnordered<_>>()
384
6
            .try_collect()
385
6
            .await
386
14
    }
387
388
    async fn list(
389
        self: Pin<&Self>,
390
        range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
391
        handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
392
16
    ) -> Result<u64, Error> {
393
8
        let range = (
394
8
            range.0.map(StoreKey::into_owned),
395
8
            range.1.map(StoreKey::into_owned),
396
8
        );
397
8
        let pattern = match range.0 {
398
6
            Bound::Included(ref start) | Bound::Excluded(
ref start0
) => match range.1 {
399
3
                Bound::Included(
ref end2
) | Bound::Excluded(ref end) => {
400
5
                    let start = start.as_str();
401
5
                    let end = end.as_str();
402
5
                    let max_length = start.len().min(end.len());
403
5
                    let length = start
404
5
                        .chars()
405
5
                        .zip(end.chars())
406
20
                        .
position5
(|(a, b)| a != b)
407
5
                        .unwrap_or(max_length);
408
5
                    format!("{}{}*", self.key_prefix, &start[..length])
409
                }
410
1
                Bound::Unbounded => format!("{}*", self.key_prefix),
411
            },
412
2
            Bound::Unbounded => format!("{}*", self.key_prefix),
413
        };
414
8
        let client = self.get_client().await
?0
;
415
8
        let mut scan_stream = client.scan(pattern, Some(self.scan_count), None);
416
8
        let mut iterations = 0;
417
15
        'outer: while let Some(
mut page7
) = scan_stream.try_next().await
?0
{
  Branch (417:27): [True: 7, False: 8]
  Branch (417:27): [Folded - Ignored]
418
7
            if let Some(keys) = page.take_results() {
  Branch (418:20): [True: 7, False: 0]
  Branch (418:20): [Folded - Ignored]
419
28
                for 
key21
in keys {
420
                    // TODO: Notification of conversion errors
421
                    // Any results that do not conform to expectations are ignored.
422
21
                    if let Some(key) = key.as_str() {
  Branch (422:28): [True: 21, False: 0]
  Branch (422:28): [Folded - Ignored]
423
21
                        if let Some(key) = key.strip_prefix(&self.key_prefix) {
  Branch (423:32): [True: 21, False: 0]
  Branch (423:32): [Folded - Ignored]
424
21
                            let key = StoreKey::new_str(key);
425
21
                            if range.contains(&key) {
  Branch (425:32): [True: 13, False: 8]
  Branch (425:32): [Folded - Ignored]
426
13
                                iterations += 1;
427
13
                                if !handler(&key) {
  Branch (427:36): [True: 0, False: 13]
  Branch (427:36): [Folded - Ignored]
428
0
                                    break 'outer;
429
13
                                }
430
8
                            }
431
0
                        }
432
0
                    }
433
                }
434
0
            }
435
7
            page.next();
436
        }
437
8
        Ok(iterations)
438
16
    }
439
440
    async fn update(
441
        self: Pin<&Self>,
442
        key: StoreKey<'_>,
443
        mut reader: DropCloserReadHalf,
444
        _upload_size: UploadSizeInfo,
445
14
    ) -> Result<(), Error> {
446
7
        let final_key = self.encode_key(&key);
447
448
        // While the name generation function can be supplied by the user, we need to have the curly
449
        // braces in place in order to manage redis' hashing behavior and make sure that the temporary
450
        // key name and the final key name are directed to the same cluster node. See
451
        // https://redis.io/blog/redis-clustering-best-practices-with-keys/
452
        //
453
        // The TL;DR is that if we're in cluster mode and the names hash differently, we can't use request
454
        // pipelining. By using these braces, we tell redis to only hash the part of the temporary key that's
455
        // identical to the final key -- so they will always hash to the same node.
456
7
        let temp_key = format!(
457
7
            "temp-{}-{{{}}}",
458
7
            (self.temp_name_generator_fn)(),
459
7
            &final_key
460
        );
461
462
7
        if is_zero_digest(key.borrow()) {
  Branch (462:12): [True: 2, False: 5]
  Branch (462:12): [Folded - Ignored]
463
2
            let chunk = reader
464
2
                .peek()
465
2
                .await
466
2
                .err_tip(|| "Failed to peek in RedisStore::update")
?0
;
467
2
            if chunk.is_empty() {
  Branch (467:16): [True: 2, False: 0]
  Branch (467:16): [Folded - Ignored]
468
2
                reader
469
2
                    .drain()
470
2
                    .await
471
2
                    .err_tip(|| "Failed to drain in RedisStore::update")
?0
;
472
                // Zero-digest keys are special -- we don't need to do anything with it.
473
2
                return Ok(());
474
0
            }
475
5
        }
476
477
5
        let client = self.get_client().await
?0
;
478
479
5
        let mut read_stream = reader
480
6
            .
scan5
(0u32, |bytes_read, chunk_res| {
481
6
                future::ready(Some(
482
6
                    chunk_res
483
6
                        .err_tip(|| "Failed to read chunk in update in redis store")
484
6
                        .and_then(|chunk| 
{5
485
5
                            let offset = *bytes_read;
486
5
                            let chunk_len = u32::try_from(chunk.len()).err_tip(
487
                                || "Could not convert chunk length to u32 in RedisStore::update",
488
0
                            )?;
489
5
                            let new_bytes_read = bytes_read
490
5
                                .checked_add(chunk_len)
491
5
                                .err_tip(|| "Overflow protection in RedisStore::update")
?0
;
492
5
                            *bytes_read = new_bytes_read;
493
5
                            Ok::<_, Error>((offset, *bytes_read, chunk))
494
5
                        }),
495
                ))
496
6
            })
497
6
            .
map5
(|res| {
498
6
                let (
offset5
,
end_pos5
,
chunk5
) = res
?1
;
499
5
                let temp_key_ref = &temp_key;
500
5
                Ok(async move {
501
5
                    client
502
5
                        .setrange::<(), _, _>(temp_key_ref, offset, chunk)
503
5
                        .await
504
5
                        .err_tip(
505
0
                            || format!("While appending to temp key ({temp_key_ref}) in RedisStore::update. offset = {offset}. end_pos = {end_pos}"),
506
0
                        )?;
507
5
                    Ok::<u32, Error>(end_pos)
508
5
                })
509
6
            })
510
5
            .try_buffer_unordered(self.max_chunk_uploads_per_update);
511
512
5
        let mut total_len: u32 = 0;
513
10
        while let Some(
last_pos5
) = read_stream.try_next().await
?1
{
  Branch (513:19): [True: 5, False: 4]
  Branch (513:19): [Folded - Ignored]
514
5
            if last_pos > total_len {
  Branch (514:16): [True: 5, False: 0]
  Branch (514:16): [Folded - Ignored]
515
5
                total_len = last_pos;
516
5
            
}0
517
        }
518
519
4
        let blob_len = client
520
4
            .strlen::<u64, _>(&temp_key)
521
4
            .await
522
4
            .err_tip(|| format!(
"In RedisStore::update strlen check for {temp_key}"0
))
?0
;
523
        // This is a safety check to ensure that in the event some kind of retry was to happen
524
        // and the data was appended to the key twice, we reject the data.
525
4
        if blob_len != u64::from(total_len) {
  Branch (525:12): [True: 0, False: 4]
  Branch (525:12): [Folded - Ignored]
526
0
            return Err(make_input_err!(
527
0
                "Data length mismatch in RedisStore::update for {}({}) - expected {} bytes, got {} bytes",
528
0
                key.borrow().as_str(),
529
0
                temp_key,
530
0
                total_len,
531
0
                blob_len,
532
0
            ));
533
4
        }
534
535
        // Rename the temp key so that the data appears under the real key. Any data already present in the real key is lost.
536
4
        client
537
4
            .rename::<(), _, _>(&temp_key, final_key.as_ref())
538
4
            .await
539
4
            .err_tip(|| "While queueing key rename in RedisStore::update()")
?0
;
540
541
        // If we have a publish channel configured, send a notice that the key has been set.
542
4
        if let Some(
pub_sub_channel0
) = &self.pub_sub_channel {
  Branch (542:16): [True: 0, False: 4]
  Branch (542:16): [Folded - Ignored]
543
0
            return Ok(client.publish(pub_sub_channel, final_key.as_ref()).await?);
544
4
        }
545
546
4
        Ok(())
547
14
    }
548
549
    async fn get_part(
550
        self: Pin<&Self>,
551
        key: StoreKey<'_>,
552
        writer: &mut DropCloserWriteHalf,
553
        offset: u64,
554
        length: Option<u64>,
555
10
    ) -> Result<(), Error> {
556
5
        let offset = usize::try_from(offset).err_tip(|| "Could not convert offset to usize")
?0
;
557
5
        let length = length
558
5
            .map(|v| 
usize::try_from4
(
v4
).
err_tip4
(|| "Could not convert length to usize"))
559
5
            .transpose()
?0
;
560
561
        // To follow RBE spec we need to consider any digest's with
562
        // zero size to be existing.
563
5
        if is_zero_digest(key.borrow()) {
  Branch (563:12): [True: 0, False: 5]
  Branch (563:12): [Folded - Ignored]
564
0
            return writer
565
0
                .send_eof()
566
0
                .err_tip(|| "Failed to send zero EOF in redis store get_part");
567
5
        }
568
569
5
        let client = self.get_client().await
?0
;
570
5
        let encoded_key = self.encode_key(&key);
571
5
        let encoded_key = encoded_key.as_ref();
572
573
        // N.B. the `-1`'s you see here are because redis GETRANGE is inclusive at both the start and end, so when we
574
        // do math with indices we change them to be exclusive at the end.
575
576
        // We want to read the data at the key from `offset` to `offset + length`.
577
5
        let data_start = offset;
578
5
        let data_end = data_start
579
5
            .saturating_add(length.unwrap_or(isize::MAX as usize))
580
5
            .saturating_sub(1);
581
582
        // And we don't ever want to read more than `read_chunk_size` bytes at a time, so we'll need to iterate.
583
5
        let mut chunk_start = data_start;
584
5
        let mut chunk_end = cmp::min(
585
5
            data_start.saturating_add(self.read_chunk_size) - 1,
586
5
            data_end,
587
        );
588
589
        loop {
590
8
            let chunk: Bytes = client
591
8
                .getrange(encoded_key, chunk_start, chunk_end)
592
8
                .await
593
8
                .err_tip(|| "In RedisStore::get_part::getrange")
?0
;
594
595
8
            let didnt_receive_full_chunk = chunk.len() < self.read_chunk_size;
596
8
            let reached_end_of_data = chunk_end == data_end;
597
598
8
            if didnt_receive_full_chunk || 
reached_end_of_data4
{
  Branch (598:16): [True: 4, False: 4]
  Branch (598:44): [True: 1, False: 3]
  Branch (598:16): [Folded - Ignored]
  Branch (598:44): [Folded - Ignored]
599
5
                if !chunk.is_empty() {
  Branch (599:20): [True: 4, False: 1]
  Branch (599:20): [Folded - Ignored]
600
4
                    writer
601
4
                        .send(chunk)
602
4
                        .await
603
4
                        .err_tip(|| "Failed to write data in RedisStore::get_part")
?0
;
604
1
                }
605
606
5
                break; // No more data to read.
607
3
            }
608
609
            // We received a full chunk's worth of data, so write it...
610
3
            writer
611
3
                .send(chunk)
612
3
                .await
613
3
                .err_tip(|| "Failed to write data in RedisStore::get_part")
?0
;
614
615
            // ...and go grab the next chunk.
616
3
            chunk_start = chunk_end + 1;
617
3
            chunk_end = cmp::min(
618
3
                chunk_start.saturating_add(self.read_chunk_size) - 1,
619
3
                data_end,
620
3
            );
621
        }
622
623
        // If we didn't write any data, check if the key exists, if not return a NotFound error.
624
        // This is required by spec.
625
5
        if writer.get_bytes_written() == 0 {
  Branch (625:12): [True: 1, False: 4]
  Branch (625:12): [Folded - Ignored]
626
            // We're supposed to read 0 bytes, so just check if the key exists.
627
1
            let exists = client
628
1
                .exists::<bool, _>(encoded_key)
629
1
                .await
630
1
                .err_tip(|| "In RedisStore::get_part::zero_exists")
?0
;
631
632
1
            if !exists {
  Branch (632:16): [True: 1, False: 0]
  Branch (632:16): [Folded - Ignored]
633
1
                return Err(make_err!(
634
1
                    Code::NotFound,
635
1
                    "Data not found in Redis store for digest: {key:?}"
636
1
                ));
637
0
            }
638
4
        }
639
640
4
        writer
641
4
            .send_eof()
642
4
            .err_tip(|| "Failed to write EOF in redis store get_part")
643
10
    }
644
645
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
646
0
        self
647
0
    }
648
649
0
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send) {
650
0
        self
651
0
    }
652
653
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send> {
654
0
        self
655
0
    }
656
657
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
658
0
        registry.register_indicator(self);
659
0
    }
660
}
661
662
#[async_trait]
663
impl HealthStatusIndicator for RedisStore {
664
0
    fn get_name(&self) -> &'static str {
665
0
        "RedisStore"
666
0
    }
667
668
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
669
0
        StoreDriver::check_health(Pin::new(self), namespace).await
670
0
    }
671
}
672
673
// -------------------------------------------------------------------
674
// Below this line are specific to the redis scheduler implementation.
675
// -------------------------------------------------------------------
676
677
/// The maximum number of results to return per cursor.
678
const MAX_COUNT_PER_CURSOR: u64 = 256;
679
/// The time in milliseconds that a redis cursor can be idle before it is closed.
680
const CURSOR_IDLE_MS: u64 = 2_000;
681
/// The name of the field in the Redis hash that stores the data.
682
const DATA_FIELD_NAME: &str = "data";
683
/// The name of the field in the Redis hash that stores the version.
684
const VERSION_FIELD_NAME: &str = "version";
685
/// The time to live of indexes in seconds. After this time redis may delete the index.
686
const INDEX_TTL_S: u64 = 60 * 60 * 24; // 24 hours.
687
688
/// Lua script to set a key if the version matches.
689
/// Args:
690
///   KEYS[1]: The key where the version is stored.
691
///   ARGV[1]: The expected version.
692
///   ARGV[2]: The new data.
693
///   ARGV[3*]: Key-value pairs of additional data to include.
694
/// Returns:
695
///   The new version if the version matches. nil is returned if the
696
///   value was not set.
697
const LUA_VERSION_SET_SCRIPT: &str = formatcp!(
698
    r"
699
local key = KEYS[1]
700
local expected_version = tonumber(ARGV[1])
701
local new_data = ARGV[2]
702
local new_version = redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', 1)
703
local i
704
local indexes = {{}}
705
706
if new_version-1 ~= expected_version then
707
    redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', -1)
708
    return 0
709
end
710
-- Skip first 2 argvs, as they are known inputs.
711
-- Remember: Lua is 1-indexed.
712
for i=3, #ARGV do
713
    indexes[i-2] = ARGV[i]
714
end
715
716
-- In testing we witnessed redis sometimes not update our FT indexes
717
-- resulting in stale data. It appears if we delete our keys then insert
718
-- them again it works and reduces risk significantly.
719
redis.call('DEL', key)
720
redis.call('HSET', key, '{DATA_FIELD_NAME}', new_data, '{VERSION_FIELD_NAME}', new_version, unpack(indexes))
721
722
return new_version
723
"
724
);
725
726
/// This is the output of the calculations below hardcoded into the executable.
727
const FINGERPRINT_CREATE_INDEX_HEX: &str = "3e762c15";
728
729
#[cfg(test)]
730
mod test {
731
    use super::FINGERPRINT_CREATE_INDEX_HEX;
732
733
    /// String of the `FT.CREATE` command used to create the index template.
734
    const CREATE_INDEX_TEMPLATE: &str = "FT.CREATE {} ON HASH PREFIX 1 {} NOOFFSETS NOHL NOFIELDS NOFREQS SCHEMA {} TAG CASESENSITIVE SORTABLE";
735
736
    /// Compile-time fingerprint of the `FT.CREATE` command used to create the
737
    /// index template. This is a simple CRC32 checksum of the command string.
738
    /// We don't care about it actually being a valid CRC32 checksum, just that
739
    /// it's a unique identifier with a low chance of collision.
740
1
    const fn fingerprint_create_index_template() -> u32 {
741
        const POLY: u32 = 0xEDB8_8320;
742
        const DATA: &[u8] = CREATE_INDEX_TEMPLATE.as_bytes();
743
1
        let mut crc = 0xFFFF_FFFF;
744
1
        let mut i = 0;
745
102
        while i < DATA.len() {
  Branch (745:15): [True: 101, False: 1]
746
101
            let byte = DATA[i];
747
101
            crc ^= byte as u32;
748
749
101
            let mut j = 0;
750
909
            while j < 8 {
  Branch (750:19): [True: 808, False: 101]
751
808
                crc = if crc & 1 != 0 {
  Branch (751:26): [True: 386, False: 422]
752
386
                    (crc >> 1) ^ POLY
753
                } else {
754
422
                    crc >> 1
755
                };
756
808
                j += 1;
757
            }
758
101
            i += 1;
759
        }
760
1
        crc
761
1
    }
762
763
    /// Verify that our calculation always evaluates to this fixed value.
764
    #[test]
765
1
    fn test_fingerprint_value() {
766
1
        assert_eq!(
767
1
            format!("{:08x}", &fingerprint_create_index_template()),
768
            FINGERPRINT_CREATE_INDEX_HEX,
769
        );
770
1
    }
771
}
772
773
/// Get the name of the index to create for the given field.
774
/// This will add some prefix data to the name to try and ensure
775
/// if the index definition changes, the name will get a new name.
776
macro_rules! get_index_name {
777
    ($prefix:expr, $field:expr, $maybe_sort:expr) => {
778
        format_args!(
779
            "{}_{}_{}_{}",
780
            $prefix,
781
            $field,
782
            $maybe_sort.unwrap_or(""),
783
            FINGERPRINT_CREATE_INDEX_HEX
784
        )
785
    };
786
}
787
788
/// Try to sanitize a string to be used as a Redis key.
789
/// We don't actually modify the string, just check if it's valid.
790
5
const fn try_sanitize(s: &str) -> Option<&str> {
791
    // Note: We cannot use for loops or iterators here because they are not const.
792
    // Allowing us to use a const function here gives the compiler the ability to
793
    // optimize this function away entirely in the case where the input is constant.
794
5
    let chars = s.as_bytes();
795
5
    let mut i: usize = 0;
796
5
    let len = s.len();
797
    loop {
798
450
        if i >= len {
  Branch (798:12): [True: 5, False: 445]
  Branch (798:12): [Folded - Ignored]
799
5
            break;
800
445
        }
801
445
        let c = chars[i];
802
445
        if !c.is_ascii_alphanumeric() && 
c != b'_'25
{
  Branch (802:12): [True: 25, False: 420]
  Branch (802:42): [True: 0, False: 25]
  Branch (802:12): [Folded - Ignored]
  Branch (802:42): [Folded - Ignored]
803
0
            return None;
804
445
        }
805
445
        i += 1;
806
    }
807
5
    Some(s)
808
5
}
809
810
/// An individual subscription to a key in Redis.
811
#[derive(Debug)]
812
pub struct RedisSubscription {
813
    receiver: Option<tokio::sync::watch::Receiver<String>>,
814
    weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
815
}
816
817
impl SchedulerSubscription for RedisSubscription {
818
    /// Wait for the subscription key to change.
819
13
    async fn changed(&mut self) -> Result<(), Error> {
820
13
        let receiver = self
821
13
            .receiver
822
13
            .as_mut()
823
13
            .ok_or_else(|| make_err!(
Code::Internal0
, "In RedisSubscription::changed::as_mut"))
?0
;
824
13
        receiver
825
13
            .changed()
826
13
            .await
827
11
            .map_err(|_| make_err!(
Code::Internal0
, "In RedisSubscription::changed::changed"))
828
11
    }
829
}
830
831
// If the subscription is dropped, we need to possibly remove the key from the
832
// subscribed keys map.
833
impl Drop for RedisSubscription {
834
3
    fn drop(&mut self) {
835
3
        let Some(receiver) = self.receiver.take() else {
  Branch (835:13): [True: 3, False: 0]
  Branch (835:13): [Folded - Ignored]
836
0
            warn!("RedisSubscription has already been dropped, nothing to do.");
837
0
            return; // Already dropped, nothing to do.
838
        };
839
3
        let key = receiver.borrow().clone();
840
        // IMPORTANT: This must be dropped before receiver_count() is called.
841
3
        drop(receiver);
842
3
        let Some(subscribed_keys) = self.weak_subscribed_keys.upgrade() else {
  Branch (842:13): [True: 3, False: 0]
  Branch (842:13): [Folded - Ignored]
843
0
            return; // Already dropped, nothing to do.
844
        };
845
3
        let mut subscribed_keys = subscribed_keys.write();
846
3
        let Some(value) = subscribed_keys.get(&key) else {
  Branch (846:13): [True: 3, False: 0]
  Branch (846:13): [Folded - Ignored]
847
0
            error!(
848
0
                "Key {key} was not found in subscribed keys when checking if it should be removed."
849
            );
850
0
            return;
851
        };
852
        // If we have no receivers, cleanup the entry from our map.
853
3
        if value.receiver_count() == 0 {
  Branch (853:12): [True: 3, False: 0]
  Branch (853:12): [Folded - Ignored]
854
3
            subscribed_keys.remove(key);
855
3
        
}0
856
3
    }
857
}
858
859
/// A publisher for a key in Redis.
860
#[derive(Debug)]
861
struct RedisSubscriptionPublisher {
862
    sender: Mutex<tokio::sync::watch::Sender<String>>,
863
}
864
865
impl RedisSubscriptionPublisher {
866
3
    fn new(
867
3
        key: String,
868
3
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
869
3
    ) -> (Self, RedisSubscription) {
870
3
        let (sender, receiver) = tokio::sync::watch::channel(key);
871
3
        let publisher = Self {
872
3
            sender: Mutex::new(sender),
873
3
        };
874
3
        let subscription = RedisSubscription {
875
3
            receiver: Some(receiver),
876
3
            weak_subscribed_keys,
877
3
        };
878
3
        (publisher, subscription)
879
3
    }
880
881
0
    fn subscribe(
882
0
        &self,
883
0
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
884
0
    ) -> RedisSubscription {
885
0
        let receiver = self.sender.lock().subscribe();
886
0
        RedisSubscription {
887
0
            receiver: Some(receiver),
888
0
            weak_subscribed_keys,
889
0
        }
890
0
    }
891
892
3
    fn receiver_count(&self) -> usize {
893
3
        self.sender.lock().receiver_count()
894
3
    }
895
896
8
    fn notify(&self) {
897
        // TODO(https://github.com/sile/patricia_tree/issues/40) When this is addressed
898
        // we can remove the `Mutex` and use the mutable iterator directly.
899
8
        self.sender.lock().send_modify(|_| {});
900
8
    }
901
}
902
903
#[derive(Debug)]
904
pub struct RedisSubscriptionManager {
905
    subscribed_keys: Arc<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
906
    tx_for_test: tokio::sync::mpsc::UnboundedSender<String>,
907
    _subscription_spawn: JoinHandleDropGuard<()>,
908
}
909
910
impl RedisSubscriptionManager {
911
2
    pub fn new(subscribe_client: SubscriberClient, pub_sub_channel: String) -> Self {
912
2
        let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new()));
913
2
        let subscribed_keys_weak = Arc::downgrade(&subscribed_keys);
914
2
        let (tx_for_test, mut rx_for_test) = tokio::sync::mpsc::unbounded_channel();
915
        Self {
916
2
            subscribed_keys,
917
2
            tx_for_test,
918
2
            _subscription_spawn: spawn!("redis_subscribe_spawn", async move {
919
2
                let mut rx = subscribe_client.message_rx();
920
                loop {
921
2
                    if let Err(
e0
) = subscribe_client.subscribe(&pub_sub_channel).await {
  Branch (921:28): [True: 0, False: 2]
  Branch (921:28): [Folded - Ignored]
922
0
                        error!("Error subscribing to pattern - {e}");
923
0
                        return;
924
2
                    }
925
2
                    let mut reconnect_rx = subscribe_client.reconnect_rx();
926
2
                    let reconnect_fut = reconnect_rx.recv().fuse();
927
2
                    tokio::pin!(reconnect_fut);
928
                    loop {
929
9
                        let 
key7
= select! {
930
9
                            
value7
= rx_for_test.recv() => {
931
7
                                let Some(value) = value else {
  Branch (931:37): [True: 7, False: 0]
  Branch (931:37): [Folded - Ignored]
932
0
                                    unreachable!("Channel should never close");
933
                                };
934
7
                                value.into()
935
                            },
936
9
                            
msg0
= rx.recv() => {
937
0
                                match msg {
938
0
                                    Ok(msg) => {
939
0
                                        if let RedisValue::String(s) = msg.value {
  Branch (939:48): [True: 0, False: 0]
  Branch (939:48): [Folded - Ignored]
940
0
                                            s
941
                                        } else {
942
0
                                            error!("Received non-string message in RedisSubscriptionManager");
943
0
                                            continue;
944
                                        }
945
                                    },
946
0
                                    Err(e) => {
947
                                        // Check to see if our parent has been dropped and if so kill spawn.
948
0
                                        if subscribed_keys_weak.upgrade().is_none() {
  Branch (948:44): [True: 0, False: 0]
  Branch (948:44): [Folded - Ignored]
949
0
                                            warn!("It appears our parent has been dropped, exiting RedisSubscriptionManager spawn");
950
0
                                            return;
951
0
                                        }
952
0
                                        error!("Error receiving message in RedisSubscriptionManager reconnecting and flagging everything changed - {e}");
953
0
                                        break;
954
                                    }
955
                                }
956
                            },
957
9
                            _ = &mut reconnect_fut => {
958
0
                                warn!("Redis reconnected flagging all subscriptions as changed and resuming");
959
0
                                break;
960
                            }
961
                        };
962
7
                        let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
  Branch (962:29): [True: 7, False: 0]
  Branch (962:29): [Folded - Ignored]
963
0
                            warn!(
964
0
                                "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"
965
                            );
966
0
                            return;
967
                        };
968
7
                        let subscribed_keys_mux = subscribed_keys.read();
969
7
                        subscribed_keys_mux
970
7
                            .common_prefix_values(&*key)
971
7
                            .for_each(RedisSubscriptionPublisher::notify);
972
                    }
973
                    // Sleep for a small amount of time to ensure we don't reconnect too quickly.
974
0
                    sleep(Duration::from_secs(1)).await;
975
                    // If we reconnect or lag behind we might have had dirty keys, so we need to
976
                    // flag all of them as changed.
977
0
                    let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
  Branch (977:25): [True: 0, False: 0]
  Branch (977:25): [Folded - Ignored]
978
0
                        warn!(
979
0
                            "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"
980
                        );
981
0
                        return;
982
                    };
983
0
                    let subscribed_keys_mux = subscribed_keys.read();
984
                    // Just in case also get a new receiver.
985
0
                    rx = subscribe_client.message_rx();
986
                    // Drop all buffered messages, then flag everything as changed.
987
0
                    rx.resubscribe();
988
0
                    for publisher in subscribed_keys_mux.values() {
989
0
                        publisher.notify();
990
0
                    }
991
                }
992
0
            }),
993
        }
994
2
    }
995
}
996
997
impl SchedulerSubscriptionManager for RedisSubscriptionManager {
998
    type Subscription = RedisSubscription;
999
1000
7
    fn notify_for_test(&self, value: String) {
1001
7
        self.tx_for_test.send(value).unwrap();
1002
7
    }
1003
1004
3
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
1005
3
    where
1006
3
        K: SchedulerStoreKeyProvider,
1007
    {
1008
3
        let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys);
1009
3
        let mut subscribed_keys = self.subscribed_keys.write();
1010
3
        let key = key.get_key();
1011
3
        let key_str = key.as_str();
1012
3
        let mut subscription = if let Some(
publisher0
) = subscribed_keys.get(&key_str) {
  Branch (1012:39): [True: 0, False: 0]
  Branch (1012:39): [Folded - Ignored]
  Branch (1012:39): [Folded - Ignored]
  Branch (1012:39): [True: 0, False: 3]
1013
0
            publisher.subscribe(weak_subscribed_keys)
1014
        } else {
1015
3
            let (publisher, subscription) =
1016
3
                RedisSubscriptionPublisher::new(key_str.to_string(), weak_subscribed_keys);
1017
3
            subscribed_keys.insert(key_str, publisher);
1018
3
            subscription
1019
        };
1020
3
        subscription
1021
3
            .receiver
1022
3
            .as_mut()
1023
3
            .ok_or_else(|| 
{0
1024
0
                make_err!(
1025
0
                    Code::Internal,
1026
                    "Receiver should be set in RedisSubscriptionManager::subscribe"
1027
                )
1028
0
            })?
1029
3
            .mark_changed();
1030
1031
3
        Ok(subscription)
1032
3
    }
1033
}
1034
1035
impl SchedulerStore for RedisStore {
1036
    type SubscriptionManager = RedisSubscriptionManager;
1037
1038
5
    fn subscription_manager(&self) -> Result<Arc<RedisSubscriptionManager>, Error> {
1039
5
        let mut subscription_manager = self.subscription_manager.lock();
1040
1041
5
        if let Some(
subscription_manager3
) = &*subscription_manager {
  Branch (1041:16): [True: 3, False: 2]
  Branch (1041:16): [Folded - Ignored]
1042
3
            Ok(subscription_manager.clone())
1043
        } else {
1044
2
            let Some(pub_sub_channel) = &self.pub_sub_channel else {
  Branch (1044:17): [True: 2, False: 0]
  Branch (1044:17): [Folded - Ignored]
1045
0
                return Err(make_input_err!(
1046
0
                    "RedisStore must have a pubsub channel for a Redis Scheduler if using subscriptions"
1047
0
                ));
1048
            };
1049
2
            let sub = Arc::new(RedisSubscriptionManager::new(
1050
2
                self.subscriber_client.clone(),
1051
2
                pub_sub_channel.clone(),
1052
            ));
1053
2
            *subscription_manager = Some(sub.clone());
1054
2
            Ok(sub)
1055
        }
1056
5
    }
1057
1058
7
    async fn update_data<T>(&self, data: T) -> Result<Option<u64>, Error>
1059
7
    where
1060
7
        T: SchedulerStoreDataProvider
1061
7
            + SchedulerStoreKeyProvider
1062
7
            + SchedulerCurrentVersionProvider
1063
7
            + Send,
1064
7
    {
1065
7
        let key = data.get_key();
1066
7
        let key = self.encode_key(&key);
1067
7
        let client = self.get_client().await
?0
;
1068
7
        let maybe_index = data.get_indexes().err_tip(|| 
{0
1069
0
            format!("Err getting index in RedisStore::update_data::versioned for {key:?}")
1070
0
        })?;
1071
7
        if <T as SchedulerStoreKeyProvider>::Versioned::VALUE {
  Branch (1071:12): [True: 0, False: 0]
  Branch (1071:12): [True: 0, False: 0]
  Branch (1071:12): [Folded - Ignored]
  Branch (1071:12): [Folded - Ignored]
  Branch (1071:12): [True: 0, False: 3]
  Branch (1071:12): [True: 4, False: 0]
1072
4
            let current_version = data.current_version();
1073
4
            let data = data.try_into_bytes().err_tip(|| 
{0
1074
0
                format!("Could not convert value to bytes in RedisStore::update_data::versioned for {key:?}")
1075
0
            })?;
1076
4
            let mut argv = Vec::with_capacity(3 + maybe_index.len() * 2);
1077
4
            argv.push(Bytes::from(format!("{current_version}")));
1078
4
            argv.push(data);
1079
16
            for (
name12
,
value12
) in maybe_index {
1080
12
                argv.push(Bytes::from_static(name.as_bytes()));
1081
12
                argv.push(value);
1082
12
            }
1083
4
            let new_version = self
1084
4
                .update_if_version_matches_script
1085
4
                .evalsha_with_reload::<u64, _, Vec<Bytes>>(client, vec![key.as_ref()], argv)
1086
4
                .await
1087
4
                .err_tip(|| format!(
"In RedisStore::update_data::versioned for {key:?}"0
))
?0
;
1088
4
            if new_version == 0 {
  Branch (1088:16): [True: 0, False: 0]
  Branch (1088:16): [True: 0, False: 0]
  Branch (1088:16): [Folded - Ignored]
  Branch (1088:16): [Folded - Ignored]
  Branch (1088:16): [True: 0, False: 0]
  Branch (1088:16): [True: 0, False: 4]
1089
0
                return Ok(None);
1090
4
            }
1091
            // If we have a publish channel configured, send a notice that the key has been set.
1092
4
            if let Some(pub_sub_channel) = &self.pub_sub_channel {
  Branch (1092:20): [True: 0, False: 0]
  Branch (1092:20): [True: 0, False: 0]
  Branch (1092:20): [Folded - Ignored]
  Branch (1092:20): [Folded - Ignored]
  Branch (1092:20): [True: 0, False: 0]
  Branch (1092:20): [True: 4, False: 0]
1093
4
                return Ok(client.publish(pub_sub_channel, key.as_ref()).await
?0
);
1094
0
            }
1095
0
            Ok(Some(new_version))
1096
        } else {
1097
3
            let data = data.try_into_bytes().err_tip(|| 
{0
1098
0
                format!("Could not convert value to bytes in RedisStore::update_data::noversion for {key:?}")
1099
0
            })?;
1100
3
            let mut fields = RedisMap::new();
1101
3
            fields.reserve(1 + maybe_index.len());
1102
3
            fields.insert(DATA_FIELD_NAME.into(), data.into());
1103
3
            for (
name0
,
value0
) in maybe_index {
1104
0
                fields.insert(name.into(), value.into());
1105
0
            }
1106
3
            client
1107
3
                .hset::<(), _, _>(key.as_ref(), fields)
1108
3
                .await
1109
3
                .err_tip(|| format!(
"In RedisStore::update_data::noversion for {key:?}"0
))
?0
;
1110
            // If we have a publish channel configured, send a notice that the key has been set.
1111
3
            if let Some(pub_sub_channel) = &self.pub_sub_channel {
  Branch (1111:20): [True: 0, False: 0]
  Branch (1111:20): [True: 0, False: 0]
  Branch (1111:20): [Folded - Ignored]
  Branch (1111:20): [Folded - Ignored]
  Branch (1111:20): [True: 3, False: 0]
  Branch (1111:20): [True: 0, False: 0]
1112
3
                return Ok(client.publish(pub_sub_channel, key.as_ref()).await
?0
);
1113
0
            }
1114
0
            Ok(Some(0)) // Always use "0" version since this is not a versioned request.
1115
        }
1116
7
    }
1117
1118
3
    async fn search_by_index_prefix<K>(
1119
3
        &self,
1120
3
        index: K,
1121
3
    ) -> Result<
1122
3
        impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
1123
3
        Error,
1124
3
    >
1125
3
    where
1126
3
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,
1127
3
    {
1128
3
        let index_value = index.index_value();
1129
5
        let 
run_ft_aggregate3
= || {
1130
5
            let client = self.client_pool.next().clone();
1131
5
            let sanitized_field = try_sanitize(index_value.as_ref()).err_tip(|| 
{0
1132
0
                format!("In RedisStore::search_by_index_prefix::try_sanitize - {index_value:?}")
1133
0
            })?;
1134
5
            Ok::<_, Error>(async move {
1135
5
                ft_aggregate(
1136
5
                    client,
1137
5
                    format!(
1138
5
                        "{}",
1139
5
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1140
                    ),
1141
5
                    format!("@{}:{{ {} }}", K::INDEX_NAME, sanitized_field),
1142
                    FtAggregateOptions {
1143
5
                        load: Some(Load::Some(vec![
1144
5
                            SearchField {
1145
5
                                identifier: DATA_FIELD_NAME.into(),
1146
5
                                property: None,
1147
5
                            },
1148
5
                            SearchField {
1149
5
                                identifier: VERSION_FIELD_NAME.into(),
1150
5
                                property: None,
1151
5
                            },
1152
5
                        ])),
1153
5
                        cursor: Some(WithCursor {
1154
5
                            count: Some(MAX_COUNT_PER_CURSOR),
1155
5
                            max_idle: Some(CURSOR_IDLE_MS),
1156
5
                        }),
1157
5
                        pipeline: vec![AggregateOperation::SortBy {
1158
5
                            properties: K::MAYBE_SORT_KEY.map_or_else(Vec::new, |v| 
{0
1159
0
                                vec![(format!("@{v}").into(), SortOrder::Asc)]
1160
0
                            }),
1161
5
                            max: None,
1162
                        }],
1163
5
                        ..Default::default()
1164
                    },
1165
                )
1166
5
                .await
1167
5
            })
1168
5
        };
1169
3
        let stream = run_ft_aggregate()
?0
1170
3
            .or_else(|_| async move 
{2
1171
2
                let mut schema = vec![SearchSchema {
1172
2
                    field_name: K::INDEX_NAME.into(),
1173
2
                    alias: None,
1174
2
                    kind: SearchSchemaKind::Tag {
1175
2
                        sortable: false,
1176
2
                        unf: false,
1177
2
                        separator: None,
1178
2
                        casesensitive: false,
1179
2
                        withsuffixtrie: false,
1180
2
                        noindex: false,
1181
2
                    },
1182
2
                }];
1183
2
                if let Some(
sort_key0
) = K::MAYBE_SORT_KEY {
  Branch (1183:24): [True: 0, False: 0]
  Branch (1183:24): [True: 0, False: 0]
  Branch (1183:24): [Folded - Ignored]
  Branch (1183:24): [Folded - Ignored]
  Branch (1183:24): [True: 0, False: 2]
1184
0
                    schema.push(SearchSchema {
1185
0
                        field_name: sort_key.into(),
1186
0
                        alias: None,
1187
0
                        kind: SearchSchemaKind::Tag {
1188
0
                            sortable: true,
1189
0
                            unf: false,
1190
0
                            separator: None,
1191
0
                            casesensitive: false,
1192
0
                            withsuffixtrie: false,
1193
0
                            noindex: false,
1194
0
                        },
1195
0
                    });
1196
2
                }
1197
2
                let create_result = self
1198
2
                    .client_pool
1199
2
                    .next()
1200
2
                    .ft_create::<(), _>(
1201
2
                        format!(
1202
2
                            "{}",
1203
2
                            get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1204
2
                        ),
1205
2
                        FtCreateOptions {
1206
2
                            on: Some(IndexKind::Hash),
1207
2
                            prefixes: vec![K::KEY_PREFIX.into()],
1208
2
                            nohl: true,
1209
2
                            nofields: true,
1210
2
                            nofreqs: true,
1211
2
                            nooffsets: true,
1212
2
                            temporary: Some(INDEX_TTL_S),
1213
2
                            ..Default::default()
1214
2
                        },
1215
2
                        schema,
1216
2
                    )
1217
2
                    .await
1218
2
                    .err_tip(|| 
{0
1219
0
                        format!(
1220
0
                            "Error with ft_create in RedisStore::search_by_index_prefix({})",
1221
0
                            get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1222
                        )
1223
0
                    });
1224
2
                let run_result = run_ft_aggregate()
?0
.await.err_tip(||
{0
1225
0
                    format!(
1226
0
                        "Error with second ft_aggregate in RedisStore::search_by_index_prefix({})",
1227
0
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1228
                    )
1229
0
                });
1230
                // Creating the index will race which is ok. If it fails to create, we only
1231
                // error if the second ft_aggregate call fails and fails to create.
1232
2
                run_result.or_else(move |e| 
create_result0
.
merge0
(
Err(e)0
))
1233
4
            })
1234
3
            .await
?0
;
1235
3
        Ok(stream.map(|result| 
{1
1236
1
            let mut redis_map =
1237
1
                result.err_tip(|| "Error in stream of in RedisStore::search_by_index_prefix")
?0
;
1238
1
            let bytes_data = redis_map
1239
1
                .remove(&RedisKey::from_static_str(DATA_FIELD_NAME))
1240
1
                .err_tip(|| "Missing data field in RedisStore::search_by_index_prefix")
?0
1241
1
                .into_bytes()
1242
1
                .err_tip(|| 
{0
1243
0
                    formatcp!("'{DATA_FIELD_NAME}' is not Bytes in RedisStore::search_by_index_prefix::into_bytes")
1244
0
                })?;
1245
1
            let version = if <K as SchedulerIndexProvider>::Versioned::VALUE {
  Branch (1245:30): [True: 0, False: 0]
  Branch (1245:30): [True: 0, False: 0]
  Branch (1245:30): [Folded - Ignored]
  Branch (1245:30): [Folded - Ignored]
  Branch (1245:30): [True: 1, False: 0]
1246
1
                redis_map
1247
1
                    .remove(&RedisKey::from_static_str(VERSION_FIELD_NAME))
1248
1
                    .err_tip(|| "Missing version field in RedisStore::search_by_index_prefix")
?0
1249
1
                    .as_u64()
1250
1
                    .err_tip(|| 
{0
1251
0
                        formatcp!("'{VERSION_FIELD_NAME}' is not u64 in RedisStore::search_by_index_prefix::as_u64")
1252
0
                    })?
1253
            } else {
1254
0
                0
1255
            };
1256
1
            K::decode(version, bytes_data)
1257
1
                .err_tip(|| "In RedisStore::search_by_index_prefix::decode")
1258
1
        }))
1259
3
    }
1260
1261
6
    async fn get_and_decode<K>(
1262
6
        &self,
1263
6
        key: K,
1264
6
    ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>
1265
6
    where
1266
6
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send,
1267
6
    {
1268
6
        let key = key.get_key();
1269
6
        let key = self.encode_key(&key);
1270
6
        let client = self.get_client().await
?0
;
1271
6
        let (maybe_version, maybe_data) = client
1272
6
            .hmget::<(Option<u64>, Option<Bytes>), _, _>(
1273
6
                key.as_ref(),
1274
6
                vec![
1275
6
                    RedisKey::from(VERSION_FIELD_NAME),
1276
6
                    RedisKey::from(DATA_FIELD_NAME),
1277
6
                ],
1278
6
            )
1279
6
            .await
1280
6
            .err_tip(|| format!(
"In RedisStore::get_without_version::notversioned {key}"0
))
?0
;
1281
6
        let Some(data) = maybe_data else {
  Branch (1281:13): [True: 0, False: 0]
  Branch (1281:13): [True: 0, False: 0]
  Branch (1281:13): [Folded - Ignored]
  Branch (1281:13): [Folded - Ignored]
  Branch (1281:13): [True: 2, False: 0]
  Branch (1281:13): [True: 4, False: 0]
1282
0
            return Ok(None);
1283
        };
1284
6
        Ok(Some(K::decode(maybe_version.unwrap_or(0), data).err_tip(
1285
0
            || format!("In RedisStore::get_with_version::notversioned::decode {key}"),
1286
0
        )?))
1287
6
    }
1288
}