Coverage Report

Created: 2025-05-30 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/redis_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::ops::{Bound, RangeBounds};
17
use core::pin::Pin;
18
use core::time::Duration;
19
use std::borrow::Cow;
20
use std::sync::{Arc, Weak};
21
22
use async_trait::async_trait;
23
use bytes::Bytes;
24
use const_format::formatcp;
25
use fred::clients::{Pool as RedisPool, SubscriberClient};
26
use fred::interfaces::{ClientLike, KeysInterface, PubsubInterface};
27
use fred::prelude::{EventInterface, HashesInterface, RediSearchInterface};
28
use fred::types::config::{
29
    Config as RedisConfig, ConnectionConfig, PerformanceConfig, ReconnectPolicy, UnresponsiveConfig,
30
};
31
use fred::types::redisearch::{
32
    AggregateOperation, FtAggregateOptions, FtCreateOptions, IndexKind, Load, SearchField,
33
    SearchSchema, SearchSchemaKind, WithCursor,
34
};
35
use fred::types::scan::Scanner;
36
use fred::types::scripts::Script;
37
use fred::types::{Builder, Key as RedisKey, Map as RedisMap, SortOrder, Value as RedisValue};
38
use futures::stream::FuturesUnordered;
39
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future};
40
use nativelink_config::stores::{RedisMode, RedisSpec};
41
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
42
use nativelink_metric::MetricsComponent;
43
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
44
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
45
use nativelink_util::spawn;
46
use nativelink_util::store_trait::{
47
    BoolValue, SchedulerCurrentVersionProvider, SchedulerIndexProvider, SchedulerStore,
48
    SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,
49
    SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo,
50
};
51
use nativelink_util::task::JoinHandleDropGuard;
52
use parking_lot::{Mutex, RwLock};
53
use patricia_tree::StringPatriciaMap;
54
use tokio::select;
55
use tokio::time::sleep;
56
use tracing::{error, info, warn};
57
use uuid::Uuid;
58
59
use crate::cas_utils::is_zero_digest;
60
use crate::redis_utils::ft_aggregate;
61
62
/// The default size of the read chunk when reading data from Redis.
63
/// Note: If this changes it should be updated in the config documentation.
64
const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024;
65
66
/// The default size of the connection pool if not specified.
67
/// Note: If this changes it should be updated in the config documentation.
68
const DEFAULT_CONNECTION_POOL_SIZE: usize = 3;
69
70
/// The default delay between retries if not specified.
71
/// Note: If this changes it should be updated in the config documentation.
72
const DEFAULT_RETRY_DELAY: f32 = 0.1;
73
/// The amount of jitter to add to the retry delay if not specified.
74
/// Note: If this changes it should be updated in the config documentation.
75
const DEFAULT_RETRY_JITTER: f32 = 0.5;
76
77
/// The default maximum capacity of the broadcast channel if not specified.
78
/// Note: If this changes it should be updated in the config documentation.
79
const DEFAULT_BROADCAST_CHANNEL_CAPACITY: usize = 4096;
80
81
/// The default connection timeout in milliseconds if not specified.
82
/// Note: If this changes it should be updated in the config documentation.
83
const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000;
84
85
/// The default command timeout in milliseconds if not specified.
86
/// Note: If this changes it should be updated in the config documentation.
87
const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000;
88
89
/// The default maximum number of chunk uploads per update.
90
/// Note: If this changes it should be updated in the config documentation.
91
const DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE: usize = 10;
92
93
/// The default COUNT value passed when scanning keys in Redis.
94
/// Note: If this changes it should be updated in the config documentation.
95
const DEFAULT_SCAN_COUNT: u32 = 10_000;
96
97
/// A [`StoreDriver`] implementation that uses Redis as a backing store.
98
#[derive(Debug, MetricsComponent)]
99
pub struct RedisStore {
100
    /// The client pool connecting to the backing Redis instance(s).
101
    client_pool: RedisPool,
102
103
    /// A channel to publish updates to when a key is added, removed, or modified.
104
    #[metric(
105
        help = "The pubsub channel to publish updates to when a key is added, removed, or modified"
106
    )]
107
    pub_sub_channel: Option<String>,
108
109
    /// A redis client for managing subscriptions.
110
    /// TODO: This should be moved into the store in followups once a standard use pattern has been determined.
111
    subscriber_client: SubscriberClient,
112
113
    /// A function used to generate names for temporary keys.
114
    temp_name_generator_fn: fn() -> String,
115
116
    /// A common prefix to append to all keys before they are sent to Redis.
117
    ///
118
    /// See [`RedisStore::key_prefix`](`nativelink_config::stores::RedisStore::key_prefix`).
119
    #[metric(help = "Prefix to append to all keys before sending to Redis")]
120
    key_prefix: String,
121
122
    /// The amount of data to read from Redis at a time.
123
    #[metric(help = "The amount of data to read from Redis at a time")]
124
    read_chunk_size: usize,
125
126
    /// The maximum number of chunk uploads per update.
127
    /// This is used to limit the number of chunk uploads per update to prevent
128
    #[metric(help = "The maximum number of chunk uploads per update")]
129
    max_chunk_uploads_per_update: usize,
130
131
    /// The COUNT value passed when scanning keys in Redis.
132
    /// This is used to hint the amount of work that should be done per response.
133
    #[metric(help = "The COUNT value passed when scanning keys in Redis")]
134
    scan_count: u32,
135
136
    /// Redis script used to update a value in redis if the version matches.
137
    /// This is done by incrementing the version number and then setting the new data
138
    /// only if the version number matches the existing version number.
139
    update_if_version_matches_script: Script,
140
141
    /// A manager for subscriptions to keys in Redis.
142
    subscription_manager: Mutex<Option<Arc<RedisSubscriptionManager>>>,
143
}
144
145
impl RedisStore {
146
    /// Create a new `RedisStore` from the given configuration.
147
0
    pub fn new(mut spec: RedisSpec) -> Result<Arc<Self>, Error> {
148
0
        if spec.addresses.is_empty() {
  Branch (148:12): [True: 0, False: 0]
  Branch (148:12): [Folded - Ignored]
149
0
            return Err(make_err!(
150
0
                Code::InvalidArgument,
151
0
                "No addresses were specified in redis store configuration."
152
0
            ));
153
0
        }
154
0
        let [addr] = spec.addresses.as_slice() else {
  Branch (154:13): [True: 0, False: 0]
  Branch (154:13): [Folded - Ignored]
155
0
            return Err(make_err!(
156
0
                Code::Unimplemented,
157
0
                "Connecting directly to multiple redis nodes in a cluster is currently unsupported. Please specify a single URL to a single node, and nativelink will use cluster discover to find the other nodes."
158
0
            ));
159
        };
160
0
        let redis_config = match spec.mode {
161
0
            RedisMode::Cluster => RedisConfig::from_url_clustered(addr),
162
0
            RedisMode::Sentinel => RedisConfig::from_url_sentinel(addr),
163
0
            RedisMode::Standard => RedisConfig::from_url_centralized(addr),
164
        }
165
0
        .err_tip_with_code(|e| {
166
0
            (
167
0
                Code::InvalidArgument,
168
0
                format!("while parsing redis node address: {e}"),
169
0
            )
170
0
        })?;
171
172
0
        let reconnect_policy = {
173
0
            if spec.retry.delay == 0.0 {
  Branch (173:16): [True: 0, False: 0]
  Branch (173:16): [Folded - Ignored]
174
0
                spec.retry.delay = DEFAULT_RETRY_DELAY;
175
0
            }
176
0
            if spec.retry.jitter == 0.0 {
  Branch (176:16): [True: 0, False: 0]
  Branch (176:16): [Folded - Ignored]
177
0
                spec.retry.jitter = DEFAULT_RETRY_JITTER;
178
0
            }
179
180
0
            let max_retries = u32::try_from(spec.retry.max_retries)
181
0
                .err_tip(|| "max_retries could not be converted to u32 in RedisStore::new")?;
182
0
            let min_delay_ms = (spec.retry.delay * 1000.0) as u32;
183
0
            let max_delay_ms = 8000;
184
0
            let jitter = (spec.retry.jitter * spec.retry.delay * 1000.0) as u32;
185
186
0
            let mut reconnect_policy = ReconnectPolicy::new_exponential(
187
0
                max_retries,  /* max_retries, 0 is unlimited */
188
0
                min_delay_ms, /* min_delay */
189
0
                max_delay_ms, /* max_delay */
190
                2,            /* mult */
191
            );
192
0
            reconnect_policy.set_jitter(jitter);
193
0
            reconnect_policy
194
        };
195
196
        {
197
0
            if spec.broadcast_channel_capacity == 0 {
  Branch (197:16): [True: 0, False: 0]
  Branch (197:16): [Folded - Ignored]
198
0
                spec.broadcast_channel_capacity = DEFAULT_BROADCAST_CHANNEL_CAPACITY;
199
0
            }
200
0
            if spec.connection_timeout_ms == 0 {
  Branch (200:16): [True: 0, False: 0]
  Branch (200:16): [Folded - Ignored]
201
0
                spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS;
202
0
            }
203
0
            if spec.command_timeout_ms == 0 {
  Branch (203:16): [True: 0, False: 0]
  Branch (203:16): [Folded - Ignored]
204
0
                spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS;
205
0
            }
206
0
            if spec.connection_pool_size == 0 {
  Branch (206:16): [True: 0, False: 0]
  Branch (206:16): [Folded - Ignored]
207
0
                spec.connection_pool_size = DEFAULT_CONNECTION_POOL_SIZE;
208
0
            }
209
0
            if spec.read_chunk_size == 0 {
  Branch (209:16): [True: 0, False: 0]
  Branch (209:16): [Folded - Ignored]
210
0
                spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
211
0
            }
212
0
            if spec.max_chunk_uploads_per_update == 0 {
  Branch (212:16): [True: 0, False: 0]
  Branch (212:16): [Folded - Ignored]
213
0
                spec.max_chunk_uploads_per_update = DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE;
214
0
            }
215
0
            if spec.scan_count == 0 {
  Branch (215:16): [True: 0, False: 0]
  Branch (215:16): [Folded - Ignored]
216
0
                spec.scan_count = DEFAULT_SCAN_COUNT;
217
0
            }
218
        }
219
0
        let connection_timeout = Duration::from_millis(spec.connection_timeout_ms);
220
0
        let command_timeout = Duration::from_millis(spec.command_timeout_ms);
221
222
0
        let mut builder = Builder::from_config(redis_config);
223
0
        builder
224
0
            .set_performance_config(PerformanceConfig {
225
0
                default_command_timeout: command_timeout,
226
0
                broadcast_channel_capacity: spec.broadcast_channel_capacity,
227
0
                ..Default::default()
228
0
            })
229
0
            .set_connection_config(ConnectionConfig {
230
0
                connection_timeout,
231
0
                internal_command_timeout: command_timeout,
232
0
                unresponsive: UnresponsiveConfig {
233
0
                    max_timeout: Some(connection_timeout),
234
0
                    // This number needs to be less than the connection timeout.
235
0
                    // We use 4 as it is a good balance between not spamming the server
236
0
                    // and not waiting too long.
237
0
                    interval: connection_timeout / 4,
238
0
                },
239
0
                ..Default::default()
240
0
            })
241
0
            .set_policy(reconnect_policy);
242
243
0
        let client_pool = builder
244
0
            .build_pool(spec.connection_pool_size)
245
0
            .err_tip(|| "while creating redis connection pool")?;
246
247
0
        let subscriber_client = builder
248
0
            .build_subscriber_client()
249
0
            .err_tip(|| "while creating redis subscriber client")?;
250
251
0
        Self::new_from_builder_and_parts(
252
0
            client_pool,
253
0
            subscriber_client,
254
0
            spec.experimental_pub_sub_channel.clone(),
255
0
            || Uuid::new_v4().to_string(),
256
0
            spec.key_prefix.clone(),
257
0
            spec.read_chunk_size,
258
0
            spec.max_chunk_uploads_per_update,
259
0
            spec.scan_count,
260
        )
261
0
        .map(Arc::new)
262
0
    }
263
264
    /// Used for testing when determinism is required.
265
    #[expect(clippy::too_many_arguments)]
266
10
    pub fn new_from_builder_and_parts(
267
10
        client_pool: RedisPool,
268
10
        subscriber_client: SubscriberClient,
269
10
        pub_sub_channel: Option<String>,
270
10
        temp_name_generator_fn: fn() -> String,
271
10
        key_prefix: String,
272
10
        read_chunk_size: usize,
273
10
        max_chunk_uploads_per_update: usize,
274
10
        scan_count: u32,
275
10
    ) -> Result<Self, Error> {
276
        // Start connection pool (this will retry forever by default).
277
10
        client_pool.connect();
278
10
        subscriber_client.connect();
279
280
10
        info!("Redis index fingerprint: {FINGERPRINT_CREATE_INDEX_HEX}");
281
282
10
        Ok(Self {
283
10
            client_pool,
284
10
            pub_sub_channel,
285
10
            subscriber_client,
286
10
            temp_name_generator_fn,
287
10
            key_prefix,
288
10
            read_chunk_size,
289
10
            max_chunk_uploads_per_update,
290
10
            scan_count,
291
10
            update_if_version_matches_script: Script::from_lua(LUA_VERSION_SET_SCRIPT),
292
10
            subscription_manager: Mutex::new(None),
293
10
        })
294
10
    }
295
296
    /// Encode a [`StoreKey`] so it can be sent to Redis.
297
21
    fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> {
298
21
        let key_body = key.as_str();
299
21
        if self.key_prefix.is_empty() {
  Branch (299:12): [True: 17, False: 4]
  Branch (299:12): [Folded - Ignored]
300
17
            key_body
301
        } else {
302
            // This is in the hot path for all redis operations, so we try to reuse the allocation
303
            // from `key.as_str()` if possible.
304
4
            match key_body {
305
4
                Cow::Owned(mut encoded_key) => {
306
4
                    encoded_key.insert_str(0, &self.key_prefix);
307
4
                    Cow::Owned(encoded_key)
308
                }
309
0
                Cow::Borrowed(body) => {
310
0
                    let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len());
311
0
                    encoded_key.push_str(&self.key_prefix);
312
0
                    encoded_key.push_str(body);
313
0
                    Cow::Owned(encoded_key)
314
                }
315
            }
316
        }
317
21
    }
318
}
319
320
#[async_trait]
321
impl StoreDriver for RedisStore {
322
    async fn has_with_results(
323
        self: Pin<&Self>,
324
        keys: &[StoreKey<'_>],
325
        results: &mut [Option<u64>],
326
12
    ) -> Result<(), Error> {
327
        // TODO(aaronmondal) We could use pipeline here, but it makes retry more
328
        // difficult and it doesn't work very well in cluster mode.
329
        // If we wanted to optimize this with pipeline be careful to
330
        // implement retry and to support cluster mode.
331
6
        let client = self.client_pool.next();
332
6
        keys.iter()
333
6
            .zip(results.iter_mut())
334
6
            .map(|(key, result)| async move {
335
                // We need to do a special pass to ensure our zero key exist.
336
6
                if is_zero_digest(key.borrow()) {
  Branch (336:20): [True: 2, False: 4]
  Branch (336:20): [Folded - Ignored]
337
2
                    *result = Some(0);
338
2
                    return Ok::<_, Error>(());
339
4
                }
340
4
                let encoded_key = self.encode_key(key);
341
4
                let pipeline = client.pipeline();
342
4
                pipeline
343
4
                    .strlen::<(), _>(encoded_key.as_ref())
344
4
                    .await
345
4
                    .err_tip(|| 
{0
346
0
                        format!("In RedisStore::has_with_results::strlen for {encoded_key}")
347
0
                    })?;
348
                // Redis returns 0 when the key doesn't exist
349
                // AND when the key exists with value of length 0.
350
                // Therefore, we need to check both length and existence
351
                // and do it in a pipeline for efficiency.
352
4
                pipeline
353
4
                    .exists::<(), _>(encoded_key.as_ref())
354
4
                    .await
355
4
                    .err_tip(|| 
{0
356
0
                        format!("In RedisStore::has_with_results::exists for {encoded_key}")
357
0
                    })?;
358
4
                let (blob_len, exists) = pipeline
359
4
                    .all::<(u64, bool)>()
360
4
                    .await
361
4
                    .err_tip(|| "In RedisStore::has_with_results::query")
?0
;
362
363
4
                *result = if exists { Some(blob_len) } else { 
None0
};
  Branch (363:30): [True: 4, False: 0]
  Branch (363:30): [Folded - Ignored]
364
365
4
                Ok::<_, Error>(())
366
12
            })
367
6
            .collect::<FuturesUnordered<_>>()
368
6
            .try_collect()
369
6
            .await
370
12
    }
371
372
    async fn list(
373
        self: Pin<&Self>,
374
        range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
375
        handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
376
16
    ) -> Result<u64, Error> {
377
8
        let range = (
378
8
            range.0.map(StoreKey::into_owned),
379
8
            range.1.map(StoreKey::into_owned),
380
8
        );
381
8
        let pattern = match range.0 {
382
6
            Bound::Included(ref start) | Bound::Excluded(
ref start0
) => match range.1 {
383
3
                Bound::Included(
ref end2
) | Bound::Excluded(ref end) => {
384
5
                    let start = start.as_str();
385
5
                    let end = end.as_str();
386
5
                    let max_length = start.len().min(end.len());
387
5
                    let length = start
388
5
                        .chars()
389
5
                        .zip(end.chars())
390
20
                        .
position5
(|(a, b)| a != b)
391
5
                        .unwrap_or(max_length);
392
5
                    format!("{}{}*", self.key_prefix, &start[..length])
393
                }
394
1
                Bound::Unbounded => format!("{}*", self.key_prefix),
395
            },
396
2
            Bound::Unbounded => format!("{}*", self.key_prefix),
397
        };
398
8
        let client = self.client_pool.next();
399
8
        let mut scan_stream = client.scan(pattern, Some(self.scan_count), None);
400
8
        let mut iterations = 0;
401
15
        'outer: while let Some(
mut page7
) = scan_stream.try_next().await
?0
{
  Branch (401:27): [True: 7, False: 8]
  Branch (401:27): [Folded - Ignored]
402
7
            if let Some(keys) = page.take_results() {
  Branch (402:20): [True: 7, False: 0]
  Branch (402:20): [Folded - Ignored]
403
28
                for 
key21
in keys {
404
                    // TODO: Notification of conversion errors
405
                    // Any results that do not conform to expectations are ignored.
406
21
                    if let Some(key) = key.as_str() {
  Branch (406:28): [True: 21, False: 0]
  Branch (406:28): [Folded - Ignored]
407
21
                        if let Some(key) = key.strip_prefix(&self.key_prefix) {
  Branch (407:32): [True: 21, False: 0]
  Branch (407:32): [Folded - Ignored]
408
21
                            let key = StoreKey::new_str(key);
409
21
                            if range.contains(&key) {
  Branch (409:32): [True: 13, False: 8]
  Branch (409:32): [Folded - Ignored]
410
13
                                iterations += 1;
411
13
                                if !handler(&key) {
  Branch (411:36): [True: 0, False: 13]
  Branch (411:36): [Folded - Ignored]
412
0
                                    break 'outer;
413
13
                                }
414
8
                            }
415
0
                        }
416
0
                    }
417
                }
418
0
            }
419
7
            page.next();
420
        }
421
8
        Ok(iterations)
422
16
    }
423
424
    async fn update(
425
        self: Pin<&Self>,
426
        key: StoreKey<'_>,
427
        mut reader: DropCloserReadHalf,
428
        _upload_size: UploadSizeInfo,
429
14
    ) -> Result<(), Error> {
430
7
        let final_key = self.encode_key(&key);
431
432
        // While the name generation function can be supplied by the user, we need to have the curly
433
        // braces in place in order to manage redis' hashing behavior and make sure that the temporary
434
        // key name and the final key name are directed to the same cluster node. See
435
        // https://redis.io/blog/redis-clustering-best-practices-with-keys/
436
        //
437
        // The TL;DR is that if we're in cluster mode and the names hash differently, we can't use request
438
        // pipelining. By using these braces, we tell redis to only hash the part of the temporary key that's
439
        // identical to the final key -- so they will always hash to the same node.
440
7
        let temp_key = format!(
441
7
            "temp-{}-{{{}}}",
442
7
            (self.temp_name_generator_fn)(),
443
7
            &final_key
444
        );
445
446
7
        if is_zero_digest(key.borrow()) {
  Branch (446:12): [True: 2, False: 5]
  Branch (446:12): [Folded - Ignored]
447
2
            let chunk = reader
448
2
                .peek()
449
2
                .await
450
2
                .err_tip(|| "Failed to peek in RedisStore::update")
?0
;
451
2
            if chunk.is_empty() {
  Branch (451:16): [True: 2, False: 0]
  Branch (451:16): [Folded - Ignored]
452
2
                reader
453
2
                    .drain()
454
2
                    .await
455
2
                    .err_tip(|| "Failed to drain in RedisStore::update")
?0
;
456
                // Zero-digest keys are special -- we don't need to do anything with it.
457
2
                return Ok(());
458
0
            }
459
5
        }
460
461
5
        let client = self.client_pool.next();
462
463
5
        let mut read_stream = reader
464
6
            .
scan5
(0u32, |bytes_read, chunk_res| {
465
6
                future::ready(Some(
466
6
                    chunk_res
467
6
                        .err_tip(|| "Failed to read chunk in update in redis store")
468
6
                        .and_then(|chunk| 
{5
469
5
                            let offset = *bytes_read;
470
5
                            let chunk_len = u32::try_from(chunk.len()).err_tip(
471
                                || "Could not convert chunk length to u32 in RedisStore::update",
472
0
                            )?;
473
5
                            let new_bytes_read = bytes_read
474
5
                                .checked_add(chunk_len)
475
5
                                .err_tip(|| "Overflow protection in RedisStore::update")
?0
;
476
5
                            *bytes_read = new_bytes_read;
477
5
                            Ok::<_, Error>((offset, *bytes_read, chunk))
478
5
                        }),
479
                ))
480
6
            })
481
6
            .
map5
(|res| {
482
6
                let (
offset5
,
end_pos5
,
chunk5
) = res
?1
;
483
5
                let temp_key_ref = &temp_key;
484
5
                Ok(async move {
485
5
                    client
486
5
                        .setrange::<(), _, _>(temp_key_ref, offset, chunk)
487
5
                        .await
488
5
                        .err_tip(
489
                            || "While appending to append to temp key in RedisStore::update",
490
0
                        )?;
491
5
                    Ok::<u32, Error>(end_pos)
492
5
                })
493
6
            })
494
5
            .try_buffer_unordered(self.max_chunk_uploads_per_update);
495
496
5
        let mut total_len: u32 = 0;
497
10
        while let Some(
last_pos5
) = read_stream.try_next().await
?1
{
  Branch (497:19): [True: 5, False: 4]
  Branch (497:19): [Folded - Ignored]
498
5
            if last_pos > total_len {
  Branch (498:16): [True: 5, False: 0]
  Branch (498:16): [Folded - Ignored]
499
5
                total_len = last_pos;
500
5
            
}0
501
        }
502
503
4
        let blob_len = client
504
4
            .strlen::<u64, _>(&temp_key)
505
4
            .await
506
4
            .err_tip(|| format!(
"In RedisStore::update strlen check for {temp_key}"0
))
?0
;
507
        // This is a safety check to ensure that in the event some kind of retry was to happen
508
        // and the data was appended to the key twice, we reject the data.
509
4
        if blob_len != u64::from(total_len) {
  Branch (509:12): [True: 0, False: 4]
  Branch (509:12): [Folded - Ignored]
510
0
            return Err(make_input_err!(
511
0
                "Data length mismatch in RedisStore::update for {}({}) - expected {} bytes, got {} bytes",
512
0
                key.borrow().as_str(),
513
0
                temp_key,
514
0
                total_len,
515
0
                blob_len,
516
0
            ));
517
4
        }
518
519
        // Rename the temp key so that the data appears under the real key. Any data already present in the real key is lost.
520
4
        client
521
4
            .rename::<(), _, _>(&temp_key, final_key.as_ref())
522
4
            .await
523
4
            .err_tip(|| "While queueing key rename in RedisStore::update()")
?0
;
524
525
        // If we have a publish channel configured, send a notice that the key has been set.
526
4
        if let Some(
pub_sub_channel0
) = &self.pub_sub_channel {
  Branch (526:16): [True: 0, False: 4]
  Branch (526:16): [Folded - Ignored]
527
0
            return Ok(client.publish(pub_sub_channel, final_key.as_ref()).await?);
528
4
        }
529
530
4
        Ok(())
531
14
    }
532
533
    async fn get_part(
534
        self: Pin<&Self>,
535
        key: StoreKey<'_>,
536
        writer: &mut DropCloserWriteHalf,
537
        offset: u64,
538
        length: Option<u64>,
539
10
    ) -> Result<(), Error> {
540
5
        let offset = usize::try_from(offset).err_tip(|| "Could not convert offset to usize")
?0
;
541
5
        let length = length
542
5
            .map(|v| 
usize::try_from4
(
v4
).
err_tip4
(|| "Could not convert length to usize"))
543
5
            .transpose()
?0
;
544
545
        // To follow RBE spec we need to consider any digest's with
546
        // zero size to be existing.
547
5
        if is_zero_digest(key.borrow()) {
  Branch (547:12): [True: 0, False: 5]
  Branch (547:12): [Folded - Ignored]
548
0
            return writer
549
0
                .send_eof()
550
0
                .err_tip(|| "Failed to send zero EOF in redis store get_part");
551
5
        }
552
553
5
        let client = self.client_pool.next();
554
5
        let encoded_key = self.encode_key(&key);
555
5
        let encoded_key = encoded_key.as_ref();
556
557
        // N.B. the `-1`'s you see here are because redis GETRANGE is inclusive at both the start and end, so when we
558
        // do math with indices we change them to be exclusive at the end.
559
560
        // We want to read the data at the key from `offset` to `offset + length`.
561
5
        let data_start = offset;
562
5
        let data_end = data_start
563
5
            .saturating_add(length.unwrap_or(isize::MAX as usize))
564
5
            .saturating_sub(1);
565
566
        // And we don't ever want to read more than `read_chunk_size` bytes at a time, so we'll need to iterate.
567
5
        let mut chunk_start = data_start;
568
5
        let mut chunk_end = cmp::min(
569
5
            data_start.saturating_add(self.read_chunk_size) - 1,
570
5
            data_end,
571
        );
572
573
        loop {
574
8
            let chunk: Bytes = client
575
8
                .getrange(encoded_key, chunk_start, chunk_end)
576
8
                .await
577
8
                .err_tip(|| "In RedisStore::get_part::getrange")
?0
;
578
579
8
            let didnt_receive_full_chunk = chunk.len() < self.read_chunk_size;
580
8
            let reached_end_of_data = chunk_end == data_end;
581
582
8
            if didnt_receive_full_chunk || 
reached_end_of_data4
{
  Branch (582:16): [True: 4, False: 4]
  Branch (582:44): [True: 1, False: 3]
  Branch (582:16): [Folded - Ignored]
  Branch (582:44): [Folded - Ignored]
583
5
                if !chunk.is_empty() {
  Branch (583:20): [True: 4, False: 1]
  Branch (583:20): [Folded - Ignored]
584
4
                    writer
585
4
                        .send(chunk)
586
4
                        .await
587
4
                        .err_tip(|| "Failed to write data in RedisStore::get_part")
?0
;
588
1
                }
589
590
5
                break; // No more data to read.
591
3
            }
592
593
            // We received a full chunk's worth of data, so write it...
594
3
            writer
595
3
                .send(chunk)
596
3
                .await
597
3
                .err_tip(|| "Failed to write data in RedisStore::get_part")
?0
;
598
599
            // ...and go grab the next chunk.
600
3
            chunk_start = chunk_end + 1;
601
3
            chunk_end = cmp::min(
602
3
                chunk_start.saturating_add(self.read_chunk_size) - 1,
603
3
                data_end,
604
3
            );
605
        }
606
607
        // If we didn't write any data, check if the key exists, if not return a NotFound error.
608
        // This is required by spec.
609
5
        if writer.get_bytes_written() == 0 {
  Branch (609:12): [True: 1, False: 4]
  Branch (609:12): [Folded - Ignored]
610
            // We're supposed to read 0 bytes, so just check if the key exists.
611
1
            let exists = client
612
1
                .exists::<bool, _>(encoded_key)
613
1
                .await
614
1
                .err_tip(|| "In RedisStore::get_part::zero_exists")
?0
;
615
616
1
            if !exists {
  Branch (616:16): [True: 1, False: 0]
  Branch (616:16): [Folded - Ignored]
617
1
                return Err(make_err!(
618
1
                    Code::NotFound,
619
1
                    "Data not found in Redis store for digest: {key:?}"
620
1
                ));
621
0
            }
622
4
        }
623
624
4
        writer
625
4
            .send_eof()
626
4
            .err_tip(|| "Failed to write EOF in redis store get_part")
627
10
    }
628
629
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
630
0
        self
631
0
    }
632
633
0
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send) {
634
0
        self
635
0
    }
636
637
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send> {
638
0
        self
639
0
    }
640
641
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
642
0
        registry.register_indicator(self);
643
0
    }
644
}
645
646
#[async_trait]
647
impl HealthStatusIndicator for RedisStore {
648
0
    fn get_name(&self) -> &'static str {
649
0
        "RedisStore"
650
0
    }
651
652
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
653
0
        StoreDriver::check_health(Pin::new(self), namespace).await
654
0
    }
655
}
656
657
// -------------------------------------------------------------------
658
// Below this line are specific to the redis scheduler implementation.
659
// -------------------------------------------------------------------
660
661
/// The maximum number of results to return per cursor.
662
const MAX_COUNT_PER_CURSOR: u64 = 256;
663
/// The time in milliseconds that a redis cursor can be idle before it is closed.
664
const CURSOR_IDLE_MS: u64 = 2_000;
665
/// The name of the field in the Redis hash that stores the data.
666
const DATA_FIELD_NAME: &str = "data";
667
/// The name of the field in the Redis hash that stores the version.
668
const VERSION_FIELD_NAME: &str = "version";
669
/// The time to live of indexes in seconds. After this time redis may delete the index.
670
const INDEX_TTL_S: u64 = 60 * 60 * 24; // 24 hours.
671
672
/// Lua script to set a key if the version matches.
673
/// Args:
674
///   KEYS[1]: The key where the version is stored.
675
///   ARGV[1]: The expected version.
676
///   ARGV[2]: The new data.
677
///   ARGV[3*]: Key-value pairs of additional data to include.
678
/// Returns:
679
///   The new version if the version matches. nil is returned if the
680
///   value was not set.
681
const LUA_VERSION_SET_SCRIPT: &str = formatcp!(
682
    r"
683
local key = KEYS[1]
684
local expected_version = tonumber(ARGV[1])
685
local new_data = ARGV[2]
686
local new_version = redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', 1)
687
local i
688
local indexes = {{}}
689
690
if new_version-1 ~= expected_version then
691
    redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', -1)
692
    return 0
693
end
694
-- Skip first 2 argvs, as they are known inputs.
695
-- Remember: Lua is 1-indexed.
696
for i=3, #ARGV do
697
    indexes[i-2] = ARGV[i]
698
end
699
700
-- In testing we witnessed redis sometimes not update our FT indexes
701
-- resulting in stale data. It appears if we delete our keys then insert
702
-- them again it works and reduces risk significantly.
703
redis.call('DEL', key)
704
redis.call('HSET', key, '{DATA_FIELD_NAME}', new_data, '{VERSION_FIELD_NAME}', new_version, unpack(indexes))
705
706
return new_version
707
"
708
);
709
710
/// This is the output of the calculations below hardcoded into the executable.
711
const FINGERPRINT_CREATE_INDEX_HEX: &str = "3e762c15";
712
713
#[cfg(test)]
714
mod test {
715
    use super::FINGERPRINT_CREATE_INDEX_HEX;
716
717
    /// String of the `FT.CREATE` command used to create the index template.
718
    const CREATE_INDEX_TEMPLATE: &str = "FT.CREATE {} ON HASH PREFIX 1 {} NOOFFSETS NOHL NOFIELDS NOFREQS SCHEMA {} TAG CASESENSITIVE SORTABLE";
719
720
    /// Compile-time fingerprint of the `FT.CREATE` command used to create the
721
    /// index template. This is a simple CRC32 checksum of the command string.
722
    /// We don't care about it actually being a valid CRC32 checksum, just that
723
    /// it's a unique identifier with a low chance of collision.
724
1
    const fn fingerprint_create_index_template() -> u32 {
725
        const POLY: u32 = 0xEDB8_8320;
726
        const DATA: &[u8] = CREATE_INDEX_TEMPLATE.as_bytes();
727
1
        let mut crc = 0xFFFF_FFFF;
728
1
        let mut i = 0;
729
102
        while i < DATA.len() {
  Branch (729:15): [True: 101, False: 1]
730
101
            let byte = DATA[i];
731
101
            crc ^= byte as u32;
732
733
101
            let mut j = 0;
734
909
            while j < 8 {
  Branch (734:19): [True: 808, False: 101]
735
808
                crc = if crc & 1 != 0 {
  Branch (735:26): [True: 386, False: 422]
736
386
                    (crc >> 1) ^ POLY
737
                } else {
738
422
                    crc >> 1
739
                };
740
808
                j += 1;
741
            }
742
101
            i += 1;
743
        }
744
1
        crc
745
1
    }
746
747
    /// Verify that our calculation always evaluates to this fixed value.
748
    #[test]
749
1
    fn test_fingerprint_value() {
750
1
        assert_eq!(
751
1
            format!("{:08x}", &fingerprint_create_index_template()),
752
            FINGERPRINT_CREATE_INDEX_HEX,
753
        );
754
1
    }
755
}
756
757
/// Get the name of the index to create for the given field.
758
/// This will add some prefix data to the name to try and ensure
759
/// if the index definition changes, the name will get a new name.
760
macro_rules! get_index_name {
761
    ($prefix:expr, $field:expr, $maybe_sort:expr) => {
762
        format_args!(
763
            "{}_{}_{}_{}",
764
            $prefix,
765
            $field,
766
            $maybe_sort.unwrap_or(""),
767
            FINGERPRINT_CREATE_INDEX_HEX
768
        )
769
    };
770
}
771
772
/// Try to sanitize a string to be used as a Redis key.
773
/// We don't actually modify the string, just check if it's valid.
774
2
const fn try_sanitize(s: &str) -> Option<&str> {
775
    // Note: We cannot use for loops or iterators here because they are not const.
776
    // Allowing us to use a const function here gives the compiler the ability to
777
    // optimize this function away entirely in the case where the input is constant.
778
2
    let chars = s.as_bytes();
779
2
    let mut i: usize = 0;
780
2
    let len = s.len();
781
    loop {
782
180
        if i >= len {
  Branch (782:12): [True: 2, False: 178]
  Branch (782:12): [Folded - Ignored]
783
2
            break;
784
178
        }
785
178
        let c = chars[i];
786
178
        if !c.is_ascii_alphanumeric() && 
c != b'_'10
{
  Branch (786:12): [True: 10, False: 168]
  Branch (786:42): [True: 0, False: 10]
  Branch (786:12): [Folded - Ignored]
  Branch (786:42): [Folded - Ignored]
787
0
            return None;
788
178
        }
789
178
        i += 1;
790
    }
791
2
    Some(s)
792
2
}
793
794
/// An individual subscription to a key in Redis.
795
#[derive(Debug)]
796
pub struct RedisSubscription {
797
    receiver: Option<tokio::sync::watch::Receiver<String>>,
798
    weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
799
}
800
801
impl SchedulerSubscription for RedisSubscription {
802
    /// Wait for the subscription key to change.
803
7
    async fn changed(&mut self) -> Result<(), Error> {
804
7
        let receiver = self
805
7
            .receiver
806
7
            .as_mut()
807
7
            .ok_or_else(|| make_err!(
Code::Internal0
, "In RedisSubscription::changed::as_mut"))
?0
;
808
7
        receiver
809
7
            .changed()
810
7
            .await
811
6
            .map_err(|_| make_err!(
Code::Internal0
, "In RedisSubscription::changed::changed"))
812
6
    }
813
}
814
815
// If the subscription is dropped, we need to possibly remove the key from the
816
// subscribed keys map.
817
impl Drop for RedisSubscription {
818
2
    fn drop(&mut self) {
819
2
        let Some(receiver) = self.receiver.take() else {
  Branch (819:13): [True: 2, False: 0]
  Branch (819:13): [Folded - Ignored]
820
0
            warn!("RedisSubscription has already been dropped, nothing to do.");
821
0
            return; // Already dropped, nothing to do.
822
        };
823
2
        let key = receiver.borrow().clone();
824
        // IMPORTANT: This must be dropped before receiver_count() is called.
825
2
        drop(receiver);
826
2
        let Some(subscribed_keys) = self.weak_subscribed_keys.upgrade() else {
  Branch (826:13): [True: 2, False: 0]
  Branch (826:13): [Folded - Ignored]
827
0
            return; // Already dropped, nothing to do.
828
        };
829
2
        let mut subscribed_keys = subscribed_keys.write();
830
2
        let Some(value) = subscribed_keys.get(&key) else {
  Branch (830:13): [True: 2, False: 0]
  Branch (830:13): [Folded - Ignored]
831
0
            error!(
832
0
                "Key {key} was not found in subscribed keys when checking if it should be removed."
833
            );
834
0
            return;
835
        };
836
        // If we have no receivers, cleanup the entry from our map.
837
2
        if value.receiver_count() == 0 {
  Branch (837:12): [True: 2, False: 0]
  Branch (837:12): [Folded - Ignored]
838
2
            subscribed_keys.remove(key);
839
2
        
}0
840
2
    }
841
}
842
843
/// A publisher for a key in Redis.
844
#[derive(Debug)]
845
struct RedisSubscriptionPublisher {
846
    sender: Mutex<tokio::sync::watch::Sender<String>>,
847
}
848
849
impl RedisSubscriptionPublisher {
850
2
    fn new(
851
2
        key: String,
852
2
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
853
2
    ) -> (Self, RedisSubscription) {
854
2
        let (sender, receiver) = tokio::sync::watch::channel(key);
855
2
        let publisher = Self {
856
2
            sender: Mutex::new(sender),
857
2
        };
858
2
        let subscription = RedisSubscription {
859
2
            receiver: Some(receiver),
860
2
            weak_subscribed_keys,
861
2
        };
862
2
        (publisher, subscription)
863
2
    }
864
865
0
    fn subscribe(
866
0
        &self,
867
0
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
868
0
    ) -> RedisSubscription {
869
0
        let receiver = self.sender.lock().subscribe();
870
0
        RedisSubscription {
871
0
            receiver: Some(receiver),
872
0
            weak_subscribed_keys,
873
0
        }
874
0
    }
875
876
2
    fn receiver_count(&self) -> usize {
877
2
        self.sender.lock().receiver_count()
878
2
    }
879
880
4
    fn notify(&self) {
881
        // TODO(https://github.com/sile/patricia_tree/issues/40) When this is addressed
882
        // we can remove the `Mutex` and use the mutable iterator directly.
883
4
        self.sender.lock().send_modify(|_| {});
884
4
    }
885
}
886
887
#[derive(Debug)]
888
pub struct RedisSubscriptionManager {
889
    subscribed_keys: Arc<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
890
    tx_for_test: tokio::sync::mpsc::UnboundedSender<String>,
891
    _subscription_spawn: JoinHandleDropGuard<()>,
892
}
893
894
impl RedisSubscriptionManager {
895
1
    pub fn new(subscribe_client: SubscriberClient, pub_sub_channel: String) -> Self {
896
1
        let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new()));
897
1
        let subscribed_keys_weak = Arc::downgrade(&subscribed_keys);
898
1
        let (tx_for_test, mut rx_for_test) = tokio::sync::mpsc::unbounded_channel();
899
        Self {
900
1
            subscribed_keys,
901
1
            tx_for_test,
902
1
            _subscription_spawn: spawn!("redis_subscribe_spawn", async move {
903
1
                let mut rx = subscribe_client.message_rx();
904
                loop {
905
1
                    if let Err(
e0
) = subscribe_client.subscribe(&pub_sub_channel).await {
  Branch (905:28): [True: 0, False: 1]
  Branch (905:28): [Folded - Ignored]
906
0
                        error!("Error subscribing to pattern - {e}");
907
0
                        return;
908
1
                    }
909
1
                    let mut reconnect_rx = subscribe_client.reconnect_rx();
910
1
                    let reconnect_fut = reconnect_rx.recv().fuse();
911
1
                    tokio::pin!(reconnect_fut);
912
                    loop {
913
4
                        let 
key3
= select! {
914
4
                            
value3
= rx_for_test.recv() => {
915
3
                                let Some(value) = value else {
  Branch (915:37): [True: 3, False: 0]
  Branch (915:37): [Folded - Ignored]
916
0
                                    unreachable!("Channel should never close");
917
                                };
918
3
                                value.into()
919
                            },
920
4
                            
msg0
= rx.recv() => {
921
0
                                match msg {
922
0
                                    Ok(msg) => {
923
0
                                        if let RedisValue::String(s) = msg.value {
  Branch (923:48): [True: 0, False: 0]
  Branch (923:48): [Folded - Ignored]
924
0
                                            s
925
                                        } else {
926
0
                                            error!("Received non-string message in RedisSubscriptionManager");
927
0
                                            continue;
928
                                        }
929
                                    },
930
0
                                    Err(e) => {
931
                                        // Check to see if our parent has been dropped and if so kill spawn.
932
0
                                        if subscribed_keys_weak.upgrade().is_none() {
  Branch (932:44): [True: 0, False: 0]
  Branch (932:44): [Folded - Ignored]
933
0
                                            warn!("It appears our parent has been dropped, exiting RedisSubscriptionManager spawn");
934
0
                                            return;
935
0
                                        }
936
0
                                        error!("Error receiving message in RedisSubscriptionManager reconnecting and flagging everything changed - {e}");
937
0
                                        break;
938
                                    }
939
                                }
940
                            },
941
4
                            _ = &mut reconnect_fut => {
942
0
                                warn!("Redis reconnected flagging all subscriptions as changed and resuming");
943
0
                                break;
944
                            }
945
                        };
946
3
                        let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
  Branch (946:29): [True: 3, False: 0]
  Branch (946:29): [Folded - Ignored]
947
0
                            warn!(
948
0
                                "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"
949
                            );
950
0
                            return;
951
                        };
952
3
                        let subscribed_keys_mux = subscribed_keys.read();
953
3
                        subscribed_keys_mux
954
3
                            .common_prefix_values(&*key)
955
3
                            .for_each(RedisSubscriptionPublisher::notify);
956
                    }
957
                    // Sleep for a small amount of time to ensure we don't reconnect too quickly.
958
0
                    sleep(Duration::from_secs(1)).await;
959
                    // If we reconnect or lag behind we might have had dirty keys, so we need to
960
                    // flag all of them as changed.
961
0
                    let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
  Branch (961:25): [True: 0, False: 0]
  Branch (961:25): [Folded - Ignored]
962
0
                        warn!(
963
0
                            "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"
964
                        );
965
0
                        return;
966
                    };
967
0
                    let subscribed_keys_mux = subscribed_keys.read();
968
                    // Just in case also get a new receiver.
969
0
                    rx = subscribe_client.message_rx();
970
                    // Drop all buffered messages, then flag everything as changed.
971
0
                    rx.resubscribe();
972
0
                    for publisher in subscribed_keys_mux.values() {
973
0
                        publisher.notify();
974
0
                    }
975
                }
976
0
            }),
977
        }
978
1
    }
979
}
980
981
impl SchedulerSubscriptionManager for RedisSubscriptionManager {
982
    type Subscription = RedisSubscription;
983
984
3
    fn notify_for_test(&self, value: String) {
985
3
        self.tx_for_test.send(value).unwrap();
986
3
    }
987
988
2
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
989
2
    where
990
2
        K: SchedulerStoreKeyProvider,
991
    {
992
2
        let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys);
993
2
        let mut subscribed_keys = self.subscribed_keys.write();
994
2
        let key = key.get_key();
995
2
        let key_str = key.as_str();
996
2
        let mut subscription = if let Some(
publisher0
) = subscribed_keys.get(&key_str) {
  Branch (996:39): [True: 0, False: 0]
  Branch (996:39): [Folded - Ignored]
  Branch (996:39): [Folded - Ignored]
  Branch (996:39): [True: 0, False: 2]
997
0
            publisher.subscribe(weak_subscribed_keys)
998
        } else {
999
2
            let (publisher, subscription) =
1000
2
                RedisSubscriptionPublisher::new(key_str.to_string(), weak_subscribed_keys);
1001
2
            subscribed_keys.insert(key_str, publisher);
1002
2
            subscription
1003
        };
1004
2
        subscription
1005
2
            .receiver
1006
2
            .as_mut()
1007
2
            .ok_or_else(|| 
{0
1008
0
                make_err!(
1009
0
                    Code::Internal,
1010
                    "Receiver should be set in RedisSubscriptionManager::subscribe"
1011
                )
1012
0
            })?
1013
2
            .mark_changed();
1014
1015
2
        Ok(subscription)
1016
2
    }
1017
}
1018
1019
impl SchedulerStore for RedisStore {
1020
    type SubscriptionManager = RedisSubscriptionManager;
1021
1022
3
    fn subscription_manager(&self) -> Result<Arc<RedisSubscriptionManager>, Error> {
1023
3
        let mut subscription_manager = self.subscription_manager.lock();
1024
1025
3
        if let Some(
subscription_manager2
) = &*subscription_manager {
  Branch (1025:16): [True: 2, False: 1]
  Branch (1025:16): [Folded - Ignored]
1026
2
            Ok(subscription_manager.clone())
1027
        } else {
1028
1
            let Some(pub_sub_channel) = &self.pub_sub_channel else {
  Branch (1028:17): [True: 1, False: 0]
  Branch (1028:17): [Folded - Ignored]
1029
0
                return Err(make_input_err!(
1030
0
                    "RedisStore must have a pubsub channel for a Redis Scheduler if using subscriptions"
1031
0
                ));
1032
            };
1033
1
            let sub = Arc::new(RedisSubscriptionManager::new(
1034
1
                self.subscriber_client.clone(),
1035
1
                pub_sub_channel.clone(),
1036
            ));
1037
1
            *subscription_manager = Some(sub.clone());
1038
1
            Ok(sub)
1039
        }
1040
3
    }
1041
1042
3
    async fn update_data<T>(&self, data: T) -> Result<Option<u64>, Error>
1043
3
    where
1044
3
        T: SchedulerStoreDataProvider
1045
3
            + SchedulerStoreKeyProvider
1046
3
            + SchedulerCurrentVersionProvider
1047
3
            + Send,
1048
3
    {
1049
3
        let key = data.get_key();
1050
3
        let key = self.encode_key(&key);
1051
3
        let client = self.client_pool.next();
1052
3
        let maybe_index = data.get_indexes().err_tip(|| 
{0
1053
0
            format!("Err getting index in RedisStore::update_data::versioned for {key:?}")
1054
0
        })?;
1055
3
        if <T as SchedulerStoreKeyProvider>::Versioned::VALUE {
  Branch (1055:12): [True: 0, False: 0]
  Branch (1055:12): [True: 0, False: 0]
  Branch (1055:12): [Folded - Ignored]
  Branch (1055:12): [Folded - Ignored]
  Branch (1055:12): [True: 0, False: 1]
  Branch (1055:12): [True: 2, False: 0]
1056
2
            let current_version = data.current_version();
1057
2
            let data = data.try_into_bytes().err_tip(|| 
{0
1058
0
                format!("Could not convert value to bytes in RedisStore::update_data::versioned for {key:?}")
1059
0
            })?;
1060
2
            let mut argv = Vec::with_capacity(3 + maybe_index.len() * 2);
1061
2
            argv.push(Bytes::from(format!("{current_version}")));
1062
2
            argv.push(data);
1063
8
            for (
name6
,
value6
) in maybe_index {
1064
6
                argv.push(Bytes::from_static(name.as_bytes()));
1065
6
                argv.push(value);
1066
6
            }
1067
2
            let new_version = self
1068
2
                .update_if_version_matches_script
1069
2
                .evalsha_with_reload::<u64, _, Vec<Bytes>>(client, vec![key.as_ref()], argv)
1070
2
                .await
1071
2
                .err_tip(|| format!(
"In RedisStore::update_data::versioned for {key:?}"0
))
?0
;
1072
2
            if new_version == 0 {
  Branch (1072:16): [True: 0, False: 0]
  Branch (1072:16): [True: 0, False: 0]
  Branch (1072:16): [Folded - Ignored]
  Branch (1072:16): [Folded - Ignored]
  Branch (1072:16): [True: 0, False: 0]
  Branch (1072:16): [True: 0, False: 2]
1073
0
                return Ok(None);
1074
2
            }
1075
            // If we have a publish channel configured, send a notice that the key has been set.
1076
2
            if let Some(pub_sub_channel) = &self.pub_sub_channel {
  Branch (1076:20): [True: 0, False: 0]
  Branch (1076:20): [True: 0, False: 0]
  Branch (1076:20): [Folded - Ignored]
  Branch (1076:20): [Folded - Ignored]
  Branch (1076:20): [True: 0, False: 0]
  Branch (1076:20): [True: 2, False: 0]
1077
2
                return Ok(client.publish(pub_sub_channel, key.as_ref()).await
?0
);
1078
0
            }
1079
0
            Ok(Some(new_version))
1080
        } else {
1081
1
            let data = data.try_into_bytes().err_tip(|| 
{0
1082
0
                format!("Could not convert value to bytes in RedisStore::update_data::noversion for {key:?}")
1083
0
            })?;
1084
1
            let mut fields = RedisMap::new();
1085
1
            fields.reserve(1 + maybe_index.len());
1086
1
            fields.insert(DATA_FIELD_NAME.into(), data.into());
1087
1
            for (
name0
,
value0
) in maybe_index {
1088
0
                fields.insert(name.into(), value.into());
1089
0
            }
1090
1
            client
1091
1
                .hset::<(), _, _>(key.as_ref(), fields)
1092
1
                .await
1093
1
                .err_tip(|| format!(
"In RedisStore::update_data::noversion for {key:?}"0
))
?0
;
1094
            // If we have a publish channel configured, send a notice that the key has been set.
1095
1
            if let Some(pub_sub_channel) = &self.pub_sub_channel {
  Branch (1095:20): [True: 0, False: 0]
  Branch (1095:20): [True: 0, False: 0]
  Branch (1095:20): [Folded - Ignored]
  Branch (1095:20): [Folded - Ignored]
  Branch (1095:20): [True: 1, False: 0]
  Branch (1095:20): [True: 0, False: 0]
1096
1
                return Ok(client.publish(pub_sub_channel, key.as_ref()).await
?0
);
1097
0
            }
1098
0
            Ok(Some(0)) // Always use "0" version since this is not a versioned request.
1099
        }
1100
3
    }
1101
1102
1
    async fn search_by_index_prefix<K>(
1103
1
        &self,
1104
1
        index: K,
1105
1
    ) -> Result<
1106
1
        impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
1107
1
        Error,
1108
1
    >
1109
1
    where
1110
1
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,
1111
1
    {
1112
1
        let index_value = index.index_value();
1113
2
        let 
run_ft_aggregate1
= || {
1114
2
            let client = self.client_pool.next().clone();
1115
2
            let sanitized_field = try_sanitize(index_value.as_ref()).err_tip(|| 
{0
1116
0
                format!("In RedisStore::search_by_index_prefix::try_sanitize - {index_value:?}")
1117
0
            })?;
1118
2
            Ok::<_, Error>(async move {
1119
2
                ft_aggregate(
1120
2
                    client,
1121
2
                    format!(
1122
2
                        "{}",
1123
2
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1124
                    ),
1125
2
                    format!("@{}:{{ {} }}", K::INDEX_NAME, sanitized_field),
1126
                    FtAggregateOptions {
1127
2
                        load: Some(Load::Some(vec![
1128
2
                            SearchField {
1129
2
                                identifier: DATA_FIELD_NAME.into(),
1130
2
                                property: None,
1131
2
                            },
1132
2
                            SearchField {
1133
2
                                identifier: VERSION_FIELD_NAME.into(),
1134
2
                                property: None,
1135
2
                            },
1136
2
                        ])),
1137
2
                        cursor: Some(WithCursor {
1138
2
                            count: Some(MAX_COUNT_PER_CURSOR),
1139
2
                            max_idle: Some(CURSOR_IDLE_MS),
1140
2
                        }),
1141
2
                        pipeline: vec![AggregateOperation::SortBy {
1142
2
                            properties: K::MAYBE_SORT_KEY.map_or_else(Vec::new, |v| 
{0
1143
0
                                vec![(format!("@{v}").into(), SortOrder::Asc)]
1144
0
                            }),
1145
2
                            max: None,
1146
                        }],
1147
2
                        ..Default::default()
1148
                    },
1149
                )
1150
2
                .await
1151
2
            })
1152
2
        };
1153
1
        let stream = run_ft_aggregate()
?0
1154
1
            .or_else(|_| async move {
1155
1
                let mut schema = vec![SearchSchema {
1156
1
                    field_name: K::INDEX_NAME.into(),
1157
1
                    alias: None,
1158
1
                    kind: SearchSchemaKind::Tag {
1159
1
                        sortable: false,
1160
1
                        unf: false,
1161
1
                        separator: None,
1162
1
                        casesensitive: false,
1163
1
                        withsuffixtrie: false,
1164
1
                        noindex: false,
1165
1
                    },
1166
1
                }];
1167
1
                if let Some(
sort_key0
) = K::MAYBE_SORT_KEY {
  Branch (1167:24): [True: 0, False: 0]
  Branch (1167:24): [True: 0, False: 0]
  Branch (1167:24): [Folded - Ignored]
  Branch (1167:24): [Folded - Ignored]
  Branch (1167:24): [True: 0, False: 1]
1168
0
                    schema.push(SearchSchema {
1169
0
                        field_name: sort_key.into(),
1170
0
                        alias: None,
1171
0
                        kind: SearchSchemaKind::Tag {
1172
0
                            sortable: true,
1173
0
                            unf: false,
1174
0
                            separator: None,
1175
0
                            casesensitive: false,
1176
0
                            withsuffixtrie: false,
1177
0
                            noindex: false,
1178
0
                        },
1179
0
                    });
1180
1
                }
1181
1
                let create_result = self
1182
1
                    .client_pool
1183
1
                    .next()
1184
1
                    .ft_create::<(), _>(
1185
1
                        format!(
1186
1
                            "{}",
1187
1
                            get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1188
1
                        ),
1189
1
                        FtCreateOptions {
1190
1
                            on: Some(IndexKind::Hash),
1191
1
                            prefixes: vec![K::KEY_PREFIX.into()],
1192
1
                            nohl: true,
1193
1
                            nofields: true,
1194
1
                            nofreqs: true,
1195
1
                            nooffsets: true,
1196
1
                            temporary: Some(INDEX_TTL_S),
1197
1
                            ..Default::default()
1198
1
                        },
1199
1
                        schema,
1200
1
                    )
1201
1
                    .await
1202
1
                    .err_tip(|| 
{0
1203
0
                        format!(
1204
0
                            "Error with ft_create in RedisStore::search_by_index_prefix({})",
1205
0
                            get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1206
                        )
1207
0
                    });
1208
1
                let run_result = run_ft_aggregate()
?0
.await.err_tip(||
{0
1209
0
                    format!(
1210
0
                        "Error with second ft_aggregate in RedisStore::search_by_index_prefix({})",
1211
0
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1212
                    )
1213
0
                });
1214
                // Creating the index will race which is ok. If it fails to create, we only
1215
                // error if the second ft_aggregate call fails and fails to create.
1216
1
                run_result.or_else(move |e| 
create_result0
.
merge0
(
Err(e)0
))
1217
2
            })
1218
1
            .await
?0
;
1219
1
        Ok(stream.map(|result| 
{0
1220
0
            let mut redis_map =
1221
0
                result.err_tip(|| "Error in stream of in RedisStore::search_by_index_prefix")?;
1222
0
            let bytes_data = redis_map
1223
0
                .remove(&RedisKey::from_static_str(DATA_FIELD_NAME))
1224
0
                .err_tip(|| "Missing data field in RedisStore::search_by_index_prefix")?
1225
0
                .into_bytes()
1226
0
                .err_tip(|| {
1227
0
                    formatcp!("'{DATA_FIELD_NAME}' is not Bytes in RedisStore::search_by_index_prefix::into_bytes")
1228
0
                })?;
1229
0
            let version = if <K as SchedulerIndexProvider>::Versioned::VALUE {
  Branch (1229:30): [True: 0, False: 0]
  Branch (1229:30): [True: 0, False: 0]
  Branch (1229:30): [Folded - Ignored]
  Branch (1229:30): [Folded - Ignored]
  Branch (1229:30): [True: 0, False: 0]
1230
0
                redis_map
1231
0
                    .remove(&RedisKey::from_static_str(VERSION_FIELD_NAME))
1232
0
                    .err_tip(|| "Missing version field in RedisStore::search_by_index_prefix")?
1233
0
                    .as_u64()
1234
0
                    .err_tip(|| {
1235
0
                        formatcp!("'{VERSION_FIELD_NAME}' is not u64 in RedisStore::search_by_index_prefix::as_u64")
1236
0
                    })?
1237
            } else {
1238
0
                0
1239
            };
1240
0
            K::decode(version, bytes_data)
1241
0
                .err_tip(|| "In RedisStore::search_by_index_prefix::decode")
1242
0
        }))
1243
1
    }
1244
1245
2
    async fn get_and_decode<K>(
1246
2
        &self,
1247
2
        key: K,
1248
2
    ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>
1249
2
    where
1250
2
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send,
1251
2
    {
1252
2
        let key = key.get_key();
1253
2
        let key = self.encode_key(&key);
1254
2
        let client = self.client_pool.next();
1255
2
        let (maybe_version, maybe_data) = client
1256
2
            .hmget::<(Option<u64>, Option<Bytes>), _, _>(
1257
2
                key.as_ref(),
1258
2
                vec![
1259
2
                    RedisKey::from(VERSION_FIELD_NAME),
1260
2
                    RedisKey::from(DATA_FIELD_NAME),
1261
2
                ],
1262
2
            )
1263
2
            .await
1264
2
            .err_tip(|| format!(
"In RedisStore::get_without_version::notversioned {key}"0
))
?0
;
1265
2
        let Some(data) = maybe_data else {
  Branch (1265:13): [True: 0, False: 0]
  Branch (1265:13): [True: 0, False: 0]
  Branch (1265:13): [Folded - Ignored]
  Branch (1265:13): [Folded - Ignored]
  Branch (1265:13): [True: 2, False: 0]
1266
0
            return Ok(None);
1267
        };
1268
2
        Ok(Some(K::decode(maybe_version.unwrap_or(0), data).err_tip(
1269
0
            || format!("In RedisStore::get_with_version::notversioned::decode {key}"),
1270
0
        )?))
1271
2
    }
1272
}