Coverage Report

Created: 2026-06-04 10:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/redis_store.rs
Line
Count
Source
1
// Copyright 2024-2026 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::fmt::Debug;
17
use core::marker::PhantomData;
18
use core::ops::{Bound, RangeBounds};
19
use core::pin::Pin;
20
use core::str::FromStr;
21
use core::time::Duration;
22
use std::borrow::Cow;
23
use std::collections::HashSet;
24
use std::sync::{Arc, Weak};
25
use std::time::Instant;
26
27
use async_trait::async_trait;
28
use bytes::Bytes;
29
use const_format::formatcp;
30
use futures::stream::FuturesUnordered;
31
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt, future};
32
use itertools::izip;
33
use nativelink_config::stores::{RedisMode, RedisSpec};
34
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
35
use nativelink_metric::MetricsComponent;
36
use nativelink_redis_tester::SubscriptionManagerNotify;
37
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
38
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
39
use nativelink_util::spawn;
40
use nativelink_util::store_trait::{
41
    BoolValue, RemoveItemCallback, SchedulerCurrentVersionProvider, SchedulerIndexProvider,
42
    SchedulerStore, SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,
43
    SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo,
44
};
45
use nativelink_util::task::JoinHandleDropGuard;
46
use parking_lot::{Mutex, RwLock};
47
use patricia_tree::StringPatriciaMap;
48
use redis::aio::{ConnectionLike, ConnectionManager, ConnectionManagerConfig};
49
use redis::cluster::ClusterClient;
50
use redis::cluster_async::ClusterConnection;
51
use redis::sentinel::{SentinelClient, SentinelNodeConnectionInfo, SentinelServerType};
52
use redis::{
53
    AsyncCommands, AsyncIter, Client, IntoConnectionInfo, PushInfo, ScanOptions, Script, Value,
54
    pipe,
55
};
56
use tokio::select;
57
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
58
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
59
use tokio::time::{sleep, timeout};
60
use tokio_stream::wrappers::UnboundedReceiverStream;
61
use tracing::{debug, error, info, trace, warn};
62
use url::Url;
63
use uuid::Uuid;
64
65
use crate::cas_utils::is_zero_digest;
66
use crate::redis_utils::{
67
    FtAggregateCursor, FtAggregateOptions, FtCreateOptions, SearchSchema, ft_aggregate, ft_create,
68
};
69
70
/// The default size of the read chunk when reading data from Redis.
71
/// Note: If this changes it should be updated in the config documentation.
72
const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024;
73
74
/// The default size of the connection pool if not specified.
75
/// Note: If this changes it should be updated in the config documentation.
76
const DEFAULT_CONNECTION_POOL_SIZE: usize = 3;
77
78
/// The default delay between retries if not specified.
79
/// Note: If this changes it should be updated in the config documentation.
80
const DEFAULT_RETRY_DELAY: f32 = 0.1;
81
82
/// The default connection timeout in milliseconds if not specified.
83
/// Note: If this changes it should be updated in the config documentation.
84
const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000;
85
86
/// The default command timeout in milliseconds if not specified.
87
/// Note: If this changes it should be updated in the config documentation.
88
const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000;
89
90
/// The default `check_health` PING ceiling in milliseconds if not specified.
91
/// Note: If this changes it should be updated in the config documentation.
92
const DEFAULT_HEALTH_CHECK_TIMEOUT_MS: u64 = 4000;
93
94
/// The default maximum number of chunk uploads per update.
95
/// Note: If this changes it should be updated in the config documentation.
96
pub const DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE: usize = 10;
97
98
/// The default COUNT value passed when scanning keys in Redis.
99
/// Note: If this changes it should be updated in the config documentation.
100
const DEFAULT_SCAN_COUNT: usize = 10_000;
101
102
/// The default COUNT value passed when scanning search indexes
103
/// Note: If this changes it should be updated in the config documentation.
104
pub const DEFAULT_MAX_COUNT_PER_CURSOR: u64 = 1_500;
105
106
const DEFAULT_CLIENT_PERMITS: usize = 500;
107
108
/// A wrapper around Redis to allow it to be reconnected.
109
pub trait RedisManager<C>
110
where
111
    C: ConnectionLike + Clone,
112
{
113
    /// Get a connection manager and a unique identifier for this connection
114
    /// which may be used to issue a reconnect later.
115
    fn get_connection(&self) -> impl Future<Output = Result<(C, Uuid), Error>> + Send;
116
117
    /// Reconnect if the uuid matches the uuid returned from `get_connection()`.
118
    fn reconnect(&self, uuid: Uuid) -> impl Future<Output = Result<(C, Uuid), Error>> + Send;
119
120
    /// Get an invocation of the update version script for a given `key`.
121
    fn update_script(&self, key: &str) -> redis::ScriptInvocation<'_>;
122
123
    /// Configure the connection to have a psubscribe on it and perform the
124
    /// subscription on reconnect.
125
    fn psubscribe(&self, pattern: &str) -> impl Future<Output = Result<(), Error>> + Send;
126
}
127
128
#[derive(Debug)]
129
pub struct ClusterRedisManager<C>
130
where
131
    C: ConnectionLike + Clone,
132
{
133
    /// A constant Uuid, we never reconnect.
134
    uuid: Uuid,
135
136
    /// Redis script used to update a value in redis if the version matches.
137
    /// This is done by incrementing the version number and then setting the new
138
    /// data only if the version number matches the existing version number.
139
    update_if_version_matches_script: Script,
140
141
    /// The client pool connecting to the backing Redis instance(s).
142
    connection_manager: C,
143
}
144
145
impl<C> ClusterRedisManager<C>
146
where
147
    C: ConnectionLike + Clone,
148
{
149
16
    pub async fn new(mut connection_manager: C) -> Result<Self, Error> {
150
16
        let update_if_version_matches_script = Script::new(LUA_VERSION_SET_SCRIPT);
151
16
        update_if_version_matches_script
152
16
            .load_async(&mut connection_manager)
153
16
            .await
?0
;
154
16
        Ok(Self {
155
16
            uuid: Uuid::new_v4(),
156
16
            update_if_version_matches_script,
157
16
            connection_manager,
158
16
        })
159
16
    }
160
}
161
162
impl<C> RedisManager<C> for ClusterRedisManager<C>
163
where
164
    C: ConnectionLike + Clone + Send + Sync,
165
{
166
34
    fn get_connection(&self) -> impl Future<Output = Result<(C, Uuid), Error>> + Send {
167
34
        future::ready(Ok((self.connection_manager.clone(), self.uuid)))
168
34
    }
169
170
0
    fn reconnect(&self, _uuid: Uuid) -> impl Future<Output = Result<(C, Uuid), Error>> + Send {
171
0
        self.get_connection()
172
0
    }
173
174
0
    fn update_script(&self, key: &str) -> redis::ScriptInvocation<'_> {
175
0
        self.update_if_version_matches_script.key(key)
176
0
    }
177
178
0
    fn psubscribe(&self, _pattern: &str) -> impl Future<Output = Result<(), Error>> + Send {
179
        // This is a no-op for cluster connections.
180
0
        future::ready(Ok(()))
181
0
    }
182
}
183
184
type RedisConnectFuture<C> = dyn Future<Output = Result<C, Error>> + Send;
185
type RedisConnectFn<C> = dyn Fn() -> Pin<Box<RedisConnectFuture<C>>> + Send + Sync;
186
187
pub struct StandardRedisManager<C>
188
where
189
    C: ConnectionLike + Clone,
190
{
191
    /// Function used to re-connect to Redis.
192
    connect_func: Box<RedisConnectFn<C>>,
193
194
    /// Redis script used to update a value in redis if the version matches.
195
    /// This is done by incrementing the version number and then setting the new
196
    /// data only if the version number matches the existing version number.
197
    update_if_version_matches_script: Script,
198
199
    /// The client pool connecting to the backing Redis instance(s) and a Uuid
200
    /// for this connection in order to avoid multiple reconnection attempts.
201
    connection_manager: tokio::sync::RwLock<(C, Uuid)>,
202
203
    /// A list of subscription that should be performed on reconnect.
204
    subscriptions: Mutex<HashSet<String>>,
205
}
206
207
impl<C> Debug for StandardRedisManager<C>
208
where
209
    C: ConnectionLike + Clone,
210
{
211
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
212
0
        f.debug_struct("StandardRedisManager")
213
0
            .field(
214
0
                "update_if_version_matches_script",
215
0
                &self.update_if_version_matches_script,
216
0
            )
217
0
            .field("subscriptions", &self.subscriptions)
218
0
            .finish()
219
0
    }
220
}
221
222
impl<C> StandardRedisManager<C>
223
where
224
    C: ConnectionLike + Clone + Send + Sync,
225
{
226
17
    async fn configure(&self, connection_manager: &mut C) -> Result<(), Error> {
227
17
        self.update_if_version_matches_script
228
17
            .load_async(connection_manager)
229
17
            .await
?0
;
230
17
        Ok(())
231
17
    }
232
233
18
    async fn new(connect_func: Box<RedisConnectFn<C>>) -> Result<Self, Error> {
234
18
        let 
connection_manager15
= connect_func().await
?3
;
235
15
        let update_if_version_matches_script = Script::new(LUA_VERSION_SET_SCRIPT);
236
15
        let connection = Self {
237
15
            connect_func,
238
15
            update_if_version_matches_script,
239
15
            connection_manager: tokio::sync::RwLock::new((connection_manager, Uuid::new_v4())),
240
15
            subscriptions: Mutex::new(HashSet::new()),
241
15
        };
242
        {
243
15
            let mut connection_manager = connection.connection_manager.write().await;
244
15
            connection.configure(&mut connection_manager.0).await
?0
;
245
        }
246
15
        Ok(connection)
247
18
    }
248
}
249
250
impl RedisManager<ConnectionManager> for StandardRedisManager<ConnectionManager> {
251
146
    async fn get_connection(&self) -> Result<(ConnectionManager, Uuid), Error> {
252
146
        Ok(self.connection_manager.read().await.clone())
253
146
    }
254
255
2
    async fn reconnect(&self, uuid: Uuid) -> Result<(ConnectionManager, Uuid), Error> {
256
2
        let mut guard = self.connection_manager.write().await;
257
2
        if guard.1 != uuid {
258
0
            let connection = guard.clone();
259
0
            drop(guard);
260
0
            return Ok(connection);
261
2
        }
262
2
        let mut connection_manager = (self.connect_func)().await
?0
;
263
2
        let uuid = Uuid::new_v4();
264
2
        self.configure(&mut connection_manager).await
?0
;
265
2
        let subscriptions = {
266
2
            let guard = self.subscriptions.lock();
267
2
            guard.iter().map(Clone::clone).collect::<Vec<_>>()
268
        };
269
2
        for 
subscription0
in subscriptions {
270
0
            connection_manager.psubscribe(&subscription).await?;
271
        }
272
2
        *guard = (connection_manager.clone(), uuid);
273
2
        Ok((connection_manager, uuid))
274
2
    }
275
276
18
    fn update_script(&self, key: &str) -> redis::ScriptInvocation<'_> {
277
18
        self.update_if_version_matches_script.key(key)
278
18
    }
279
280
3
    async fn psubscribe(&self, pattern: &str) -> Result<(), Error> {
281
3
        let mut connection = self.get_connection().await
?0
.0;
282
3
        let new_subscription = self.subscriptions.lock().insert(String::from(pattern));
283
3
        if new_subscription {
284
3
            let result = connection.psubscribe(pattern).await;
285
3
            if result.is_err() {
286
0
                self.subscriptions.lock().remove(pattern);
287
3
            }
288
3
            result
?0
;
289
0
        }
290
3
        Ok(())
291
3
    }
292
}
293
294
/// A [`StoreDriver`] implementation that uses Redis as a backing store.
295
#[derive(MetricsComponent)]
296
pub struct RedisStore<C, M>
297
where
298
    C: ConnectionLike + Clone,
299
    M: RedisManager<C>,
300
{
301
    /// The client pool connecting to the backing Redis instance(s).
302
    connection_manager: M,
303
304
    /// The underlying connection type in the connection manager.
305
    _connection_type: PhantomData<C>,
306
307
    /// A channel to publish updates to when a key is added, removed, or modified.
308
    #[metric(
309
        help = "The pubsub channel to publish updates to when a key is added, removed, or modified"
310
    )]
311
    pub_sub_channel: Option<String>,
312
313
    /// A function used to generate names for temporary keys.
314
    temp_name_generator_fn: fn() -> String,
315
316
    /// A common prefix to append to all keys before they are sent to Redis.
317
    ///
318
    /// See [`RedisStore::key_prefix`](`nativelink_config::stores::RedisStore::key_prefix`).
319
    #[metric(help = "Prefix to append to all keys before sending to Redis")]
320
    key_prefix: String,
321
322
    /// The amount of data to read from Redis at a time.
323
    #[metric(help = "The amount of data to read from Redis at a time")]
324
    read_chunk_size: usize,
325
326
    /// The maximum number of chunk uploads per update.
327
    /// This is used to limit the number of chunk uploads per update to prevent
328
    /// overloading when uploading large blocks of data
329
    #[metric(help = "The maximum number of chunk uploads per update")]
330
    max_chunk_uploads_per_update: usize,
331
332
    /// The COUNT value passed when scanning keys in Redis.
333
    /// This is used to hint the amount of work that should be done per response.
334
    #[metric(help = "The COUNT value passed when scanning keys in Redis")]
335
    scan_count: usize,
336
337
    /// The COUNT value used with search indexes
338
    #[metric(help = "The maximum number of results to return per cursor")]
339
    max_count_per_cursor: u64,
340
341
    /// A manager for subscriptions to keys in Redis.
342
    subscription_manager: tokio::sync::OnceCell<Arc<RedisSubscriptionManager>>,
343
344
    /// Channel for getting subscription messages
345
    subscriber_channel: Mutex<Option<UnboundedReceiver<PushInfo>>>,
346
347
    /// Permits to limit inflight Redis requests. Technically only
348
    /// limits the calls to `get_client()`, but the requests per client
349
    /// are small enough that it works well enough.
350
    client_permits: Arc<Semaphore>,
351
352
    /// Per-call ceiling for `check_health` PING.
353
    health_check_timeout: Duration,
354
}
355
356
impl<C, M> Debug for RedisStore<C, M>
357
where
358
    C: ConnectionLike + Clone,
359
    M: RedisManager<C>,
360
{
361
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
362
0
        f.debug_struct("RedisStore")
363
0
            .field("temp_name_generator_fn", &self.temp_name_generator_fn)
364
0
            .field("key_prefix", &self.key_prefix)
365
0
            .field("read_chunk_size", &self.read_chunk_size)
366
0
            .field(
367
0
                "max_chunk_uploads_per_update",
368
0
                &self.max_chunk_uploads_per_update,
369
0
            )
370
0
            .field("scan_count", &self.scan_count)
371
0
            .field("subscription_manager", &self.subscription_manager)
372
0
            .field("subscriber_channel", &self.subscriber_channel)
373
0
            .field("client_permits", &self.client_permits)
374
0
            .finish()
375
0
    }
376
}
377
378
struct ClientWithPermit<C: ConnectionLike> {
379
    connection_manager: C,
380
    uuid: Uuid,
381
382
    // here so it sticks around with the client and doesn't get dropped until that does
383
    #[allow(dead_code)]
384
    semaphore_permit: OwnedSemaphorePermit,
385
}
386
387
impl<C: ConnectionLike + Clone> ClientWithPermit<C> {
388
1
    async fn reconnect<M: RedisManager<C> + Sync>(&mut self, manager: &M) -> Result<(), Error> {
389
1
        (self.connection_manager, self.uuid) = manager.reconnect(self.uuid).await
?0
;
390
1
        Ok(())
391
1
    }
392
}
393
394
impl<C: ConnectionLike> Drop for ClientWithPermit<C> {
395
142
    fn drop(&mut self) {
396
142
        trace!(
397
142
            remaining = self.semaphore_permit.semaphore().available_permits(),
398
            "Dropping a client permit"
399
        );
400
142
    }
401
}
402
403
impl<C, M> RedisStore<C, M>
404
where
405
    C: ConnectionLike + Clone + Sync,
406
    M: RedisManager<C> + Sync,
407
{
408
    /// Used for testing when determinism is required.
409
    #[expect(clippy::too_many_arguments)]
410
31
    pub async fn new_from_builder_and_parts(
411
31
        pub_sub_channel: Option<String>,
412
31
        temp_name_generator_fn: fn() -> String,
413
31
        key_prefix: String,
414
31
        read_chunk_size: usize,
415
31
        max_chunk_uploads_per_update: usize,
416
31
        scan_count: usize,
417
31
        max_client_permits: usize,
418
31
        max_count_per_cursor: u64,
419
31
        health_check_timeout: Duration,
420
31
        subscriber_channel: UnboundedReceiver<PushInfo>,
421
31
        connection_manager: M,
422
31
    ) -> Result<Self, Error> {
423
31
        info!("Redis index fingerprint: {FINGERPRINT_CREATE_INDEX_HEX}");
424
425
31
        Ok(Self {
426
31
            connection_manager,
427
31
            _connection_type: PhantomData,
428
31
            pub_sub_channel,
429
31
            temp_name_generator_fn,
430
31
            key_prefix,
431
31
            read_chunk_size,
432
31
            max_chunk_uploads_per_update,
433
31
            scan_count,
434
31
            subscription_manager: tokio::sync::OnceCell::new(),
435
31
            subscriber_channel: Mutex::new(Some(subscriber_channel)),
436
31
            client_permits: Arc::new(Semaphore::new(max_client_permits)),
437
31
            max_count_per_cursor,
438
31
            health_check_timeout,
439
31
        })
440
31
    }
441
442
142
    async fn get_client(&self) -> Result<ClientWithPermit<C>, Error> {
443
142
        let local_client_permits = self.client_permits.clone();
444
142
        let remaining = local_client_permits.available_permits();
445
142
        let semaphore_permit = local_client_permits.acquire_owned().await
?0
;
446
142
        trace!(remaining, "Got a client permit");
447
142
        let (connection_manager, uuid) = self.connection_manager.get_connection().await
?0
;
448
142
        Ok(ClientWithPermit {
449
142
            connection_manager,
450
142
            uuid,
451
142
            semaphore_permit,
452
142
        })
453
142
    }
454
455
    /// Encode a [`StoreKey`] so it can be sent to Redis.
456
136
    fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> {
457
136
        let key_body = key.as_str();
458
136
        if self.key_prefix.is_empty() {
459
132
            key_body
460
        } else {
461
            // This is in the hot path for all redis operations, so we try to reuse the allocation
462
            // from `key.as_str()` if possible.
463
4
            match key_body {
464
4
                Cow::Owned(mut encoded_key) => {
465
4
                    encoded_key.insert_str(0, &self.key_prefix);
466
4
                    Cow::Owned(encoded_key)
467
                }
468
0
                Cow::Borrowed(body) => {
469
0
                    let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len());
470
0
                    encoded_key.push_str(&self.key_prefix);
471
0
                    encoded_key.push_str(body);
472
0
                    Cow::Owned(encoded_key)
473
                }
474
            }
475
        }
476
136
    }
477
478
18
    fn set_spec_defaults(spec: &mut RedisSpec) -> Result<(), Error> {
479
18
        if spec.addresses.is_empty() {
480
0
            return Err(make_err!(
481
0
                Code::InvalidArgument,
482
0
                "No addresses were specified in redis store configuration."
483
0
            ));
484
18
        }
485
486
18
        if spec.broadcast_channel_capacity != 0 {
487
1
            warn!("broadcast_channel_capacity in Redis spec is deprecated and ignored");
488
17
        }
489
18
        if spec.response_timeout_s != 0 {
490
0
            warn!(
491
                "response_timeout_s in Redis spec is deprecated and ignored, use command_timeout_ms"
492
            );
493
18
        }
494
18
        if spec.connection_timeout_s != 0 {
495
0
            if spec.connection_timeout_ms != 0 {
496
0
                return Err(make_err!(
497
0
                    Code::InvalidArgument,
498
0
                    "Both connection_timeout_s and connection_timeout_ms were set, can only have one!"
499
0
                ));
500
0
            }
501
0
            warn!("connection_timeout_s in Redis spec is deprecated, use connection_timeout_ms");
502
0
            spec.connection_timeout_ms = spec.connection_timeout_s * 1000;
503
18
        }
504
18
        if spec.connection_timeout_ms == 0 {
505
13
            spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS;
506
13
        
}5
507
18
        if spec.command_timeout_ms == 0 {
508
16
            spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS;
509
16
        
}2
510
18
        if spec.health_check_timeout_ms == 0 {
511
18
            spec.health_check_timeout_ms = DEFAULT_HEALTH_CHECK_TIMEOUT_MS;
512
18
        
}0
513
18
        if spec.connection_pool_size == 0 {
514
18
            spec.connection_pool_size = DEFAULT_CONNECTION_POOL_SIZE;
515
18
        
}0
516
18
        if spec.read_chunk_size == 0 {
517
18
            spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
518
18
        
}0
519
18
        if spec.max_count_per_cursor == 0 {
520
18
            spec.max_count_per_cursor = DEFAULT_MAX_COUNT_PER_CURSOR;
521
18
        
}0
522
18
        if spec.max_chunk_uploads_per_update == 0 {
523
18
            spec.max_chunk_uploads_per_update = DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE;
524
18
        
}0
525
18
        if spec.scan_count == 0 {
526
18
            spec.scan_count = DEFAULT_SCAN_COUNT;
527
18
        
}0
528
18
        if spec.max_client_permits == 0 {
529
18
            spec.max_client_permits = DEFAULT_CLIENT_PERMITS;
530
18
        
}0
531
18
        if spec.retry.delay == 0.0 {
532
18
            spec.retry.delay = DEFAULT_RETRY_DELAY;
533
18
        
}0
534
18
        if spec.retry.max_retries == 0 {
535
18
            spec.retry.max_retries = 1;
536
18
        
}0
537
18
        trace!(?spec, "redis spec is after setting defaults");
538
18
        Ok(())
539
18
    }
540
541
    // Only used by tests, because we need to make a real redis connection, then fix this to get fixed values
542
6
    pub fn replace_temp_name_generator(&mut self, replacement: fn() -> String) {
543
6
        self.temp_name_generator_fn = replacement;
544
6
    }
545
}
546
547
impl RedisStore<ClusterConnection, ClusterRedisManager<ClusterConnection>> {
548
0
    pub async fn new_cluster(mut spec: RedisSpec) -> Result<Arc<Self>, Error> {
549
0
        if spec.mode != RedisMode::Cluster {
550
0
            return Err(Error::new(
551
0
                Code::InvalidArgument,
552
0
                "new_cluster only works for Cluster mode".to_string(),
553
0
            ));
554
0
        }
555
0
        Self::set_spec_defaults(&mut spec)?;
556
557
0
        let parsed_addrs: Vec<_> = spec
558
0
            .addresses
559
0
            .iter_mut()
560
0
            .map(|addr| {
561
0
                addr.clone().into_connection_info().map(|connection_info| {
562
0
                    let redis_settings = connection_info
563
0
                        .redis_settings()
564
0
                        .clone()
565
                        // We need RESP3 here because the cluster mode doesn't support RESP2 pubsub
566
                        // See also https://docs.rs/redis/latest/redis/cluster_async/index.html#pubsub
567
0
                        .set_protocol(redis::ProtocolVersion::RESP3);
568
0
                    connection_info.set_redis_settings(redis_settings)
569
0
                })
570
0
            })
571
0
            .collect::<Result<Vec<_>, _>>()?;
572
573
0
        let connection_timeout = Duration::from_millis(spec.connection_timeout_ms);
574
0
        let command_timeout = Duration::from_millis(spec.command_timeout_ms);
575
0
        let (tx, subscriber_channel) = unbounded_channel();
576
577
0
        let builder = ClusterClient::builder(parsed_addrs)
578
0
            .connection_timeout(connection_timeout)
579
0
            .response_timeout(command_timeout)
580
0
            .push_sender(tx)
581
0
            .retries(u32::try_from(spec.retry.max_retries)?);
582
583
0
        let client = builder.build()?;
584
585
0
        Self::new_from_builder_and_parts(
586
0
            spec.experimental_pub_sub_channel,
587
0
            || Uuid::new_v4().to_string(),
588
0
            spec.key_prefix.clone(),
589
0
            spec.read_chunk_size,
590
0
            spec.max_chunk_uploads_per_update,
591
0
            spec.scan_count,
592
0
            spec.max_client_permits,
593
0
            spec.max_count_per_cursor,
594
0
            Duration::from_millis(spec.health_check_timeout_ms),
595
0
            subscriber_channel,
596
0
            ClusterRedisManager::new(client.get_async_connection().await?).await?,
597
        )
598
0
        .await
599
0
        .map(Arc::new)
600
0
    }
601
}
602
603
impl RedisStore<ConnectionManager, StandardRedisManager<ConnectionManager>> {
604
20
    async fn connect(
605
20
        spec: RedisSpec,
606
20
        tx: UnboundedSender<PushInfo>,
607
20
    ) -> Result<ConnectionManager, Error> {
608
20
        let connection_timeout = Duration::from_millis(spec.connection_timeout_ms);
609
20
        let command_timeout = Duration::from_millis(spec.command_timeout_ms);
610
611
20
        let addr = &spec.addresses[0];
612
20
        let local_addr = addr.clone();
613
20
        let mut parsed_addr = local_addr
614
20
            .replace("redis+sentinel://", "redis://")
615
20
            .into_connection_info()
?0
;
616
617
20
        let redis_settings = parsed_addr
618
20
            .redis_settings()
619
20
            .clone()
620
            // We need RESP3 here because we want to do set_push_sender
621
20
            .set_protocol(redis::ProtocolVersion::RESP3);
622
20
        parsed_addr = parsed_addr.set_redis_settings(redis_settings);
623
20
        debug!(?parsed_addr, "Parsed redis addr");
624
625
20
        let 
client19
= timeout(
626
20
            connection_timeout,
627
20
            spawn!("connect", async move {
628
20
                match spec.mode {
629
11
                    RedisMode::Standard => Client::open(parsed_addr).map_err(Into::<Error>::into),
630
                    RedisMode::Cluster => {
631
0
                        return Err(Error::new(
632
0
                            Code::Internal,
633
0
                            "Use RedisStore::new_cluster for cluster connections".to_owned(),
634
0
                        ));
635
                    }
636
9
                    RedisMode::Sentinel => async {
637
9
                        let url_parsing = Url::parse(&local_addr)
?0
;
638
9
                        let master_name = url_parsing
639
9
                            .query_pairs()
640
9
                            .find(|(key, _)| 
key1
==
"sentinelServiceName"1
)
641
9
                            .map_or_else(|| 
"master"8
.
into8
(), |(_, value)|
value1
.
to_string1
());
642
643
9
                        let redis_connection_info = parsed_addr.redis_settings().clone();
644
9
                        let sentinel_connection_info = SentinelNodeConnectionInfo::default()
645
9
                            .set_redis_connection_info(redis_connection_info);
646
647
                        // We fish this out because sentinels don't support db, we need to set it
648
                        // on the client only. See also https://github.com/redis-rs/redis-rs/issues/1950
649
9
                        let original_db = parsed_addr.redis_settings().db();
650
9
                        if original_db != 0 {
651
1
                            // sentinel_connection_info has the actual DB set
652
1
                            let revised_settings = parsed_addr.redis_settings().clone().set_db(0);
653
1
                            parsed_addr = parsed_addr.set_redis_settings(revised_settings);
654
8
                        }
655
656
9
                        SentinelClient::build(
657
9
                            vec![parsed_addr],
658
9
                            master_name,
659
9
                            Some(sentinel_connection_info),
660
9
                            SentinelServerType::Master,
661
                        )
662
9
                        .map_err(Into::<Error>::into)
663
9
                    }
664
18
                    .
and_then9
(|mut s| async move
{9
Ok(
s9
.async_get_client().await) })
665
9
                    .await
?0
666
9
                    .map_err(Into::<Error>::into),
667
                }
668
20
                .err_tip_with_code(|_e| 
{1
669
1
                    (
670
1
                        Code::InvalidArgument,
671
1
                        format!("While connecting to redis with url: {local_addr}"),
672
1
                    )
673
1
                })
674
20
            }),
675
        )
676
20
        .await
677
20
        .err_tip(|| 
format!0
("Timeout while connecting to redis with url: {addr}"))
?0
?0
?1
;
678
679
19
        let connection_manager_config = {
680
19
            ConnectionManagerConfig::new()
681
19
                .set_number_of_retries(spec.retry.max_retries)
682
19
                .set_connection_timeout(Some(connection_timeout))
683
19
                .set_response_timeout(Some(command_timeout))
684
19
                .set_push_sender(tx)
685
        };
686
687
17
        let mut connection_manager =
688
19
            ConnectionManager::new_with_config(client, connection_manager_config)
689
19
                .await
690
19
                .err_tip(|| 
format!2
("While connecting to redis with url: {addr}"))
?2
;
691
692
17
        if let Some(
pub_sub_channel3
) = spec.experimental_pub_sub_channel {
693
3
            connection_manager.psubscribe(pub_sub_channel).await
?0
;
694
14
        }
695
696
17
        Ok(connection_manager)
697
20
    }
698
699
    /// Create a new `RedisStore` from the given configuration.
700
18
    pub async fn new_standard(mut spec: RedisSpec) -> Result<Arc<Self>, Error> {
701
18
        Self::set_spec_defaults(&mut spec)
?0
;
702
703
18
        if spec.addresses.len() != 1 {
704
0
            return Err(make_err!(
705
0
                Code::Unimplemented,
706
0
                "Connecting directly to multiple redis nodes in a cluster is currently unsupported. Please specify a single URL to a single node, and nativelink will use cluster discover to find the other nodes."
707
0
            ));
708
18
        }
709
710
18
        let (tx, subscriber_channel) = unbounded_channel();
711
712
15
        Self::new_from_builder_and_parts(
713
18
            spec.experimental_pub_sub_channel.clone(),
714
1
            || Uuid::new_v4().to_string(),
715
18
            spec.key_prefix.clone(),
716
18
            spec.read_chunk_size,
717
18
            spec.max_chunk_uploads_per_update,
718
18
            spec.scan_count,
719
18
            spec.max_client_permits,
720
18
            spec.max_count_per_cursor,
721
18
            Duration::from_millis(spec.health_check_timeout_ms),
722
18
            subscriber_channel,
723
20
            
StandardRedisManager::new18
(
Box::new18
(move || {
724
20
                Box::pin(Self::connect(spec.clone(), tx.clone()))
725
20
            }))
726
18
            .await
?3
,
727
        )
728
15
        .await
729
15
        .map(Arc::new)
730
18
    }
731
}
732
733
#[async_trait]
734
impl<C, M> StoreDriver for RedisStore<C, M>
735
where
736
    C: ConnectionLike + Clone + Send + Sync + Unpin + 'static,
737
    M: RedisManager<C> + Unpin + Send + Sync + 'static,
738
{
739
    async fn has_with_results(
740
        self: Pin<&Self>,
741
        keys: &[StoreKey<'_>],
742
        results: &mut [Option<u64>],
743
6
    ) -> Result<(), Error> {
744
        // TODO(palfrey) We could use pipeline here, but it makes retry more
745
        // difficult and it doesn't work very well in cluster mode.
746
        // If we wanted to optimize this with pipeline be careful to
747
        // implement retry and to support cluster mode.
748
749
        izip!(keys.iter(), results.iter_mut(),)
750
6
            .map(|(key, result)| async move {
751
                // We need to do a special pass to ensure our zero key exist.
752
6
                if is_zero_digest(key.borrow()) {
753
2
                    *result = Some(0);
754
2
                    return Ok::<_, Error>(());
755
4
                }
756
4
                let encoded_key = self.encode_key(key);
757
758
4
                let mut client = self.get_client().await
?0
;
759
760
                // Redis returns 0 when the key doesn't exist
761
                // AND when the key exists with value of length 0.
762
                // Therefore, we need to check both length and existence
763
                // and do it in a pipeline for efficiency
764
4
                let (blob_len, exists) = pipe()
765
4
                    .strlen(encoded_key.as_ref())
766
4
                    .exists(encoded_key.as_ref())
767
4
                    .query_async::<(u64, bool)>(&mut client.connection_manager)
768
4
                    .await
769
4
                    .err_tip(|| "In RedisStore::has_with_results::all")
?0
;
770
771
4
                *result = if exists { Some(blob_len) } else { 
None0
};
772
773
4
                Ok::<_, Error>(())
774
12
            })
775
            .collect::<FuturesUnordered<_>>()
776
            .try_collect()
777
            .await
778
6
    }
779
780
    async fn list(
781
        self: Pin<&Self>,
782
        range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
783
        handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
784
8
    ) -> Result<u64, Error> {
785
        let range = (
786
            range.0.map(StoreKey::into_owned),
787
            range.1.map(StoreKey::into_owned),
788
        );
789
        let pattern = match range.0 {
790
            Bound::Included(ref start) | Bound::Excluded(ref start) => match range.1 {
791
                Bound::Included(ref end) | Bound::Excluded(ref end) => {
792
                    let start = start.as_str();
793
                    let end = end.as_str();
794
                    let max_length = start.len().min(end.len());
795
                    let length = start
796
                        .chars()
797
                        .zip(end.chars())
798
20
                        .position(|(a, b)| a != b)
799
                        .unwrap_or(max_length);
800
                    format!("{}{}*", self.key_prefix, &start[..length])
801
                }
802
                Bound::Unbounded => format!("{}*", self.key_prefix),
803
            },
804
            Bound::Unbounded => format!("{}*", self.key_prefix),
805
        };
806
        let mut client = self.get_client().await?;
807
        trace!(%pattern, count=self.scan_count, "Running SCAN");
808
        let opts = ScanOptions::default()
809
            .with_pattern(pattern)
810
            .with_count(self.scan_count);
811
        let mut scan_stream: AsyncIter<Value> = client
812
            .connection_manager
813
            .scan_options(opts)
814
            .await
815
            .err_tip(|| "During scan_options")?;
816
        let mut iterations = 0;
817
        let mut errors = vec![];
818
        while let Some(key) = scan_stream.next_item().await {
819
            if let Ok(Value::BulkString(raw_key)) = key {
820
                let Ok(str_key) = str::from_utf8(&raw_key) else {
821
                    error!(?raw_key, "Non-utf8 key");
822
                    errors.push(format!("Non-utf8 key {raw_key:?}"));
823
                    continue;
824
                };
825
                if let Some(key) = str_key.strip_prefix(&self.key_prefix) {
826
                    let key = StoreKey::new_str(key);
827
                    if range.contains(&key) {
828
                        iterations += 1;
829
                        if !handler(&key) {
830
                            error!("Issue in handler");
831
                            errors.push("Issue in handler".to_string());
832
                        }
833
                    } else {
834
                        trace!(%key, ?range, "Key not in range");
835
                    }
836
                } else {
837
                    errors.push("Key doesn't match prefix".to_string());
838
                }
839
            } else {
840
                error!(?key, "Non-string in key");
841
                errors.push("Non-string in key".to_string());
842
            }
843
        }
844
        if errors.is_empty() {
845
            Ok(iterations)
846
        } else {
847
            error!(?errors, "Errors in scan stream");
848
            Err(Error::new(Code::Internal, format!("Errors: {errors:?}")))
849
        }
850
8
    }
851
852
    async fn update(
853
        self: Pin<&Self>,
854
        key: StoreKey<'_>,
855
        mut reader: DropCloserReadHalf,
856
        _upload_size: UploadSizeInfo,
857
9
    ) -> Result<u64, Error> {
858
        let final_key = self.encode_key(&key);
859
860
        // While the name generation function can be supplied by the user, we need to have the curly
861
        // braces in place in order to manage redis' hashing behavior and make sure that the temporary
862
        // key name and the final key name are directed to the same cluster node. See
863
        // https://redis.io/blog/redis-clustering-best-practices-with-keys/
864
        //
865
        // The TL;DR is that if we're in cluster mode and the names hash differently, we can't use request
866
        // pipelining. By using these braces, we tell redis to only hash the part of the temporary key that's
867
        // identical to the final key -- so they will always hash to the same node.
868
        let temp_key = format!(
869
            "temp-{}-{{{}}}",
870
            (self.temp_name_generator_fn)(),
871
            &final_key
872
        );
873
874
        if is_zero_digest(key.borrow()) {
875
            let chunk = reader
876
                .peek()
877
                .await
878
                .err_tip(|| "Failed to peek in RedisStore::update")?;
879
            if chunk.is_empty() {
880
                reader
881
                    .drain()
882
                    .await
883
                    .err_tip(|| "Failed to drain in RedisStore::update")?;
884
                // Zero-digest keys are special -- we don't need to do anything with it.
885
                return Ok(0);
886
            }
887
        }
888
889
        let mut client = self.get_client().await?;
890
891
        let mut read_stream = reader
892
8
            .scan(0u32, |bytes_read, chunk_res| {
893
8
                future::ready(Some(
894
8
                    chunk_res
895
8
                        .err_tip(|| "Failed to read chunk in update in redis store")
896
8
                        .and_then(|chunk| 
{7
897
7
                            let offset = isize::try_from(*bytes_read).err_tip(|| "Could not convert offset to isize in RedisStore::update")
?0
;
898
7
                            let chunk_len = u32::try_from(chunk.len()).err_tip(
899
                                || "Could not convert chunk length to u32 in RedisStore::update",
900
0
                            )?;
901
7
                            let new_bytes_read = bytes_read
902
7
                                .checked_add(chunk_len)
903
7
                                .err_tip(|| "Overflow protection in RedisStore::update")
?0
;
904
7
                            *bytes_read = new_bytes_read;
905
7
                            Ok::<_, Error>((offset, *bytes_read, chunk))
906
7
                        }),
907
                ))
908
8
            })
909
8
            .map(|res| {
910
8
                let (
offset7
,
end_pos7
,
chunk7
) = res
?1
;
911
7
                let temp_key_ref = &temp_key;
912
7
                Ok(async move {
913
7
                    let (mut connection_manager, connect_id) = self.connection_manager.get_connection().await
?0
;
914
7
                    match connection_manager
915
7
                        .setrange::<_, _, usize>(temp_key_ref, offset, chunk.to_vec())
916
7
                        .await {
917
5
                        Ok(_) => {},
918
1
                        Err(err)
919
2
                            if err.kind() == redis::ErrorKind::Server(redis::ServerErrorKind::ReadOnly
)1
=>
920
                        {
921
1
                            let (mut connection_manager, _connect_id) = self.connection_manager.reconnect(connect_id).await
?0
;
922
1
                            connection_manager
923
1
                                .setrange::<_, _, usize>(temp_key_ref, offset, chunk.to_vec())
924
1
                                .await
925
1
                                .err_tip(
926
0
                                    || format!("(after reconnect) while appending to temp key ({temp_key_ref}) in RedisStore::update. offset = {offset}. end_pos = {end_pos}"),
927
0
                                )?;
928
                        }
929
1
                        Err(err) => {
930
1
                            let mut error: Error = err.into();
931
1
                            error
932
1
                                .messages
933
1
                                .push(format!("While appending to temp key ({temp_key_ref}) in RedisStore::update. offset = {offset}. end_pos = {end_pos}"));
934
1
                            return Err(error);
935
                        }
936
                    }
937
6
                    Ok::<u32, Error>(end_pos)
938
7
                })
939
8
            })
940
            .try_buffer_unordered(self.max_chunk_uploads_per_update);
941
942
        let mut total_len: u32 = 0;
943
        while let Some(last_pos) = read_stream.try_next().await? {
944
            if last_pos > total_len {
945
                total_len = last_pos;
946
            }
947
        }
948
949
        let blob_len: usize = client
950
            .connection_manager
951
            .strlen(&temp_key)
952
            .await
953
0
            .err_tip(|| format!("In RedisStore::update strlen check for {temp_key}"))?;
954
        // This is a safety check to ensure that in the event some kind of retry was to happen
955
        // and the data was appended to the key twice, we reject the data.
956
        if blob_len != usize::try_from(total_len).unwrap_or(usize::MAX) {
957
            return Err(make_input_err!(
958
                "Data length mismatch in RedisStore::update for {}({}) - expected {} bytes, got {} bytes",
959
                key.borrow().as_str(),
960
                temp_key,
961
                total_len,
962
                blob_len,
963
            ));
964
        }
965
966
        // Rename the temp key so that the data appears under the real key. Any data already present in the real key is lost.
967
        client
968
            .connection_manager
969
            .rename::<_, _, ()>(&temp_key, final_key.as_ref())
970
            .await
971
            .err_tip(|| "While queueing key rename in RedisStore::update()")?;
972
973
        // If we have a publish channel configured, send a notice that the key has been set.
974
        if let Some(pub_sub_channel) = &self.pub_sub_channel {
975
            client
976
                .connection_manager
977
                .publish::<_, _, ()>(pub_sub_channel, final_key.as_ref())
978
                .await?;
979
        }
980
981
        Ok(blob_len.try_into().unwrap_or(0))
982
9
    }
983
984
    async fn get_part(
985
        self: Pin<&Self>,
986
        key: StoreKey<'_>,
987
        writer: &mut DropCloserWriteHalf,
988
        offset: u64,
989
        length: Option<u64>,
990
5
    ) -> Result<(), Error> {
991
        let offset = isize::try_from(offset).err_tip(|| "Could not convert offset to isize")?;
992
        let length = length
993
4
            .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize"))
994
            .transpose()?;
995
996
        // To follow RBE spec we need to consider any digest's with
997
        // zero size to be existing.
998
        if is_zero_digest(key.borrow()) {
999
            return writer
1000
                .send_eof()
1001
                .err_tip(|| "Failed to send zero EOF in redis store get_part");
1002
        }
1003
1004
        let encoded_key = self.encode_key(&key);
1005
        let encoded_key = encoded_key.as_ref();
1006
1007
        // N.B. the `-1`'s you see here are because redis GETRANGE is inclusive at both the start and end, so when we
1008
        // do math with indices we change them to be exclusive at the end.
1009
1010
        // We want to read the data at the key from `offset` to `offset + length`.
1011
        let data_start = offset;
1012
        let data_end = data_start
1013
            .saturating_add(length.unwrap_or(isize::MAX as usize) as isize)
1014
            .saturating_sub(1);
1015
1016
        // And we don't ever want to read more than `read_chunk_size` bytes at a time, so we'll need to iterate.
1017
        let mut chunk_start = data_start;
1018
        let mut chunk_end = cmp::min(
1019
            data_start.saturating_add(self.read_chunk_size as isize) - 1,
1020
            data_end,
1021
        );
1022
1023
        let mut client = self.get_client().await?;
1024
        loop {
1025
            let chunk: Bytes = client
1026
                .connection_manager
1027
                .getrange(encoded_key, chunk_start, chunk_end)
1028
                .await
1029
                .err_tip(|| "In RedisStore::get_part::getrange")?;
1030
1031
            let didnt_receive_full_chunk = chunk.len() < self.read_chunk_size;
1032
            let reached_end_of_data = chunk_end == data_end;
1033
1034
            if didnt_receive_full_chunk || reached_end_of_data {
1035
                if !chunk.is_empty() {
1036
                    writer
1037
                        .send(chunk)
1038
                        .await
1039
                        .err_tip(|| "Failed to write data in RedisStore::get_part")?;
1040
                }
1041
1042
                break; // No more data to read.
1043
            }
1044
1045
            // We received a full chunk's worth of data, so write it...
1046
            writer
1047
                .send(chunk)
1048
                .await
1049
                .err_tip(|| "Failed to write data in RedisStore::get_part")?;
1050
1051
            // ...and go grab the next chunk.
1052
            chunk_start = chunk_end + 1;
1053
            chunk_end = cmp::min(
1054
                chunk_start.saturating_add(self.read_chunk_size as isize) - 1,
1055
                data_end,
1056
            );
1057
        }
1058
1059
        // If we didn't write any data, check if the key exists, if not return a NotFound error.
1060
        // This is required by spec.
1061
        if writer.get_bytes_written() == 0 {
1062
            // We're supposed to read 0 bytes, so just check if the key exists.
1063
            let exists: bool = client
1064
                .connection_manager
1065
                .exists(encoded_key)
1066
                .await
1067
                .err_tip(|| "In RedisStore::get_part::zero_exists")?;
1068
1069
            if !exists {
1070
                return Err(make_err!(
1071
                    Code::NotFound,
1072
                    "Data not found in Redis store for digest: {key:?}"
1073
                ));
1074
            }
1075
        }
1076
1077
        writer
1078
            .send_eof()
1079
            .err_tip(|| "Failed to write EOF in redis store get_part")
1080
5
    }
1081
1082
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
1083
0
        self
1084
0
    }
1085
1086
0
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send) {
1087
0
        self
1088
0
    }
1089
1090
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send> {
1091
0
        self
1092
0
    }
1093
1094
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
1095
0
        registry.register_indicator(self);
1096
0
    }
1097
1098
0
    fn register_remove_callback(
1099
0
        self: Arc<Self>,
1100
0
        _callback: Arc<dyn RemoveItemCallback>,
1101
0
    ) -> Result<(), Error> {
1102
        // As redis doesn't drop stuff, we can just ignore this
1103
0
        Ok(())
1104
0
    }
1105
}
1106
1107
#[async_trait]
1108
impl<C, M> HealthStatusIndicator for RedisStore<C, M>
1109
where
1110
    C: ConnectionLike + Clone + Send + Sync + Unpin + 'static,
1111
    M: RedisManager<C> + Send + Sync + Unpin + 'static,
1112
{
1113
1
    fn get_name(&self) -> &'static str {
1114
1
        "RedisStore"
1115
1
    }
1116
1117
    /// Lightweight health check: just `PING` the master, bounded by a
1118
    /// short physical timeout. The default `StoreDriver::check_health`
1119
    /// performs a full `update_oneshot` + `has` + `get_part_unchunked`
1120
    /// roundtrip, which queues behind real production traffic on the
1121
    /// same connection-permit semaphore and Redis master. When the
1122
    /// store is even moderately loaded that easily exceeds the
1123
    /// `HealthServer` per-indicator budget (default 5 s), each
1124
    /// RedisStore-backed indicator (AC, small-blob CAS, scheduler)
1125
    /// reports `HealthStatus::Timeout`, and `/status` returns 503 —
1126
    /// surfaced as a readiness-probe failure that sheds traffic from
1127
    /// an otherwise-functional pod. A `PING` proves the connection
1128
    /// is reachable and the master is accepting commands; that is
1129
    /// the only invariant a kubelet probe needs.
1130
0
    async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus {
1131
        let mut client = match self.get_client().await {
1132
            Ok(c) => c,
1133
            Err(e) => {
1134
                return HealthStatus::new_failed(
1135
                    self,
1136
                    format!("RedisStore::check_health: failed to acquire connection: {e}").into(),
1137
                );
1138
            }
1139
        };
1140
1141
        // Hold the `ClientWithPermit` for the duration of the call so
1142
        // its `Drop` releases the semaphore permit on exit. We just
1143
        // need a `&mut` to the connection manager underneath.
1144
0
        let ping = async {
1145
0
            redis::cmd("PING")
1146
0
                .query_async::<()>(&mut client.connection_manager)
1147
0
                .await
1148
0
        };
1149
        match timeout(self.health_check_timeout, ping).await {
1150
            Ok(Ok(())) => HealthStatus::new_ok(self, "RedisStore::check_health: PING ok".into()),
1151
            Ok(Err(e)) => HealthStatus::new_failed(
1152
                self,
1153
                format!("RedisStore::check_health: PING errored: {e}").into(),
1154
            ),
1155
            Err(_) => HealthStatus::new_failed(
1156
                self,
1157
                format!(
1158
                    "RedisStore::check_health: PING exceeded {}ms timeout",
1159
                    self.health_check_timeout.as_millis()
1160
                )
1161
                .into(),
1162
            ),
1163
        }
1164
0
    }
1165
}
1166
1167
// -------------------------------------------------------------------
1168
// Below this line are specific to the redis scheduler implementation.
1169
// -------------------------------------------------------------------
1170
1171
/// The time in milliseconds that a redis cursor can be idle before it is closed.
1172
const CURSOR_IDLE_MS: u64 = 30_000;
1173
/// The name of the field in the Redis hash that stores the data.
1174
const DATA_FIELD_NAME: &str = "data";
1175
/// The name of the field in the Redis hash that stores the version.
1176
const VERSION_FIELD_NAME: &str = "version";
1177
/// The time to live of indexes in seconds. After this time redis may delete the index.
1178
const INDEX_TTL_S: u64 = 60 * 60 * 24; // 24 hours.
1179
1180
#[allow(rustdoc::broken_intra_doc_links)]
1181
/// Lua script to set a key if the version matches.
1182
/// Args:
1183
///   KEYS[1]: The key where the version is stored.
1184
///   ARGV[1]: The expected version.
1185
///   ARGV[2]: TTL in seconds, or 0 for forever
1186
///   ARGV[3]: The new data.
1187
///   ARGV[4*]: Key-value pairs of additional data to include.
1188
/// Returns:
1189
///   The new version if the version matches. nil is returned if the
1190
///   value was not set.
1191
pub const LUA_VERSION_SET_SCRIPT: &str = formatcp!(
1192
    r"
1193
local key = KEYS[1]
1194
local expected_version = tonumber(ARGV[1])
1195
local ttl = tonumber(ARGV[2])
1196
local new_data = ARGV[3]
1197
local new_version = redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', 1)
1198
local i
1199
local indexes = {{}}
1200
1201
if new_version-1 ~= expected_version then
1202
    redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', -1)
1203
    return {{ 0, new_version-1 }}
1204
end
1205
-- Skip first 3 argvs, as they are known inputs.
1206
-- Remember: Lua is 1-indexed.
1207
for i=4, #ARGV do
1208
    indexes[i-3] = ARGV[i]
1209
end
1210
1211
-- In testing we witnessed redis sometimes not update our FT indexes
1212
-- resulting in stale data. It appears if we delete our keys then insert
1213
-- them again it works and reduces risk significantly.
1214
redis.call('DEL', key)
1215
redis.call('HSET', key, '{DATA_FIELD_NAME}', new_data, '{VERSION_FIELD_NAME}', new_version, unpack(indexes))
1216
1217
if ttl ~= 0 then
1218
    redis.call('EXPIRE', key, ttl)
1219
end
1220
return {{ 1, new_version }}
1221
"
1222
);
1223
1224
/// This is the output of the calculations below hardcoded into the executable.
1225
const FINGERPRINT_CREATE_INDEX_HEX: &str = "3e762c15";
1226
1227
#[cfg(test)]
1228
mod test {
1229
    use super::FINGERPRINT_CREATE_INDEX_HEX;
1230
1231
    /// String of the `FT.CREATE` command used to create the index template.
1232
    const CREATE_INDEX_TEMPLATE: &str = "FT.CREATE {} ON HASH PREFIX 1 {} NOOFFSETS NOHL NOFIELDS NOFREQS SCHEMA {} TAG CASESENSITIVE SORTABLE";
1233
1234
    /// Compile-time fingerprint of the `FT.CREATE` command used to create the
1235
    /// index template. This is a simple CRC32 checksum of the command string.
1236
    /// We don't care about it actually being a valid CRC32 checksum, just that
1237
    /// it's a unique identifier with a low chance of collision.
1238
1
    const fn fingerprint_create_index_template() -> u32 {
1239
        const POLY: u32 = 0xEDB8_8320;
1240
        const DATA: &[u8] = CREATE_INDEX_TEMPLATE.as_bytes();
1241
1
        let mut crc = 0xFFFF_FFFF;
1242
1
        let mut i = 0;
1243
102
        while i < DATA.len() {
1244
101
            let byte = DATA[i];
1245
101
            crc ^= byte as u32;
1246
1247
101
            let mut j = 0;
1248
909
            while j < 8 {
1249
808
                crc = if crc & 1 != 0 {
1250
386
                    (crc >> 1) ^ POLY
1251
                } else {
1252
422
                    crc >> 1
1253
                };
1254
808
                j += 1;
1255
            }
1256
101
            i += 1;
1257
        }
1258
1
        crc
1259
1
    }
1260
1261
    /// Verify that our calculation always evaluates to this fixed value.
1262
    #[test]
1263
1
    fn test_fingerprint_value() {
1264
1
        assert_eq!(
1265
1
            format!("{:08x}", &fingerprint_create_index_template()),
1266
            FINGERPRINT_CREATE_INDEX_HEX,
1267
        );
1268
1
    }
1269
}
1270
1271
/// Get the name of the index to create for the given field.
1272
/// This will add some prefix data to the name to try and ensure
1273
/// if the index definition changes, the name will get a new name.
1274
macro_rules! get_index_name {
1275
    ($prefix:expr, $field:expr, $maybe_sort:expr) => {
1276
        format_args!(
1277
            "{}_{}_{}_{}",
1278
            $prefix,
1279
            $field,
1280
            $maybe_sort.unwrap_or(""),
1281
            FINGERPRINT_CREATE_INDEX_HEX
1282
        )
1283
    };
1284
}
1285
1286
/// Try to sanitize a string to be used as a Redis key.
1287
/// We don't actually modify the string, just check if it's valid.
1288
28
const fn try_sanitize(s: &str) -> bool {
1289
    // Note: We cannot use for loops or iterators here because they are not const.
1290
    // Allowing us to use a const function here gives the compiler the ability to
1291
    // optimize this function away entirely in the case where the input is constant.
1292
28
    let chars = s.as_bytes();
1293
28
    let mut i: usize = 0;
1294
28
    let len = s.len();
1295
    loop {
1296
546
        if i >= len {
1297
28
            break;
1298
518
        }
1299
518
        let c = chars[i];
1300
518
        if !c.is_ascii_alphanumeric() && 
c != b'_'20
{
1301
0
            return false;
1302
518
        }
1303
518
        i += 1;
1304
    }
1305
28
    true
1306
28
}
1307
1308
/// An individual subscription to a key in Redis.
1309
#[derive(Debug)]
1310
pub struct RedisSubscription {
1311
    receiver: Option<tokio::sync::watch::Receiver<String>>,
1312
    weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
1313
}
1314
1315
impl SchedulerSubscription for RedisSubscription {
1316
    /// Wait for the subscription key to change.
1317
20
    async fn changed(&mut self) -> Result<(), Error> {
1318
20
        let receiver = self
1319
20
            .receiver
1320
20
            .as_mut()
1321
20
            .ok_or_else(|| 
make_err!0
(
Code::Internal0
, "In RedisSubscription::changed::as_mut"))
?0
;
1322
20
        receiver.changed().await.
map_err17
(|err|
{0
1323
0
            Error::from_std_err(Code::Internal, &err)
1324
0
                .append("In RedisSubscription::changed::changed")
1325
0
        })
1326
17
    }
1327
}
1328
1329
// If the subscription is dropped, we need to possibly remove the key from the
1330
// subscribed keys map.
1331
impl Drop for RedisSubscription {
1332
410
    fn drop(&mut self) {
1333
410
        let Some(receiver) = self.receiver.take() else {
1334
0
            warn!("RedisSubscription has already been dropped, nothing to do.");
1335
0
            return; // Already dropped, nothing to do.
1336
        };
1337
410
        let key = receiver.borrow().clone();
1338
410
        let Some(
subscribed_keys408
) = self.weak_subscribed_keys.upgrade() else {
1339
2
            return; // Parent dropped — nothing to do.
1340
        };
1341
408
        let mut subscribed_keys = subscribed_keys.write();
1342
408
        let Some(publisher) = subscribed_keys.get(&key) else {
1343
0
            warn!(
1344
                %key,
1345
                "RedisSubscription::drop: key absent from subscribed_keys under write lock — \
1346
                 indicates an unexpected removal path",
1347
            );
1348
0
            return;
1349
        };
1350
        // Count includes our own (still-alive) receiver. If we are the
1351
        // sole subscriber, remove the publisher entry.
1352
408
        if publisher.receiver_count() == 1 {
1353
206
            subscribed_keys.remove(&key);
1354
206
        
}202
1355
408
        drop(receiver);
1356
410
    }
1357
}
1358
1359
/// A publisher for a key in Redis.
1360
#[derive(Debug)]
1361
struct RedisSubscriptionPublisher {
1362
    sender: Mutex<tokio::sync::watch::Sender<String>>,
1363
}
1364
1365
impl RedisSubscriptionPublisher {
1366
208
    fn new(
1367
208
        key: String,
1368
208
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
1369
208
    ) -> (Self, RedisSubscription) {
1370
208
        let (sender, receiver) = tokio::sync::watch::channel(key);
1371
208
        let publisher = Self {
1372
208
            sender: Mutex::new(sender),
1373
208
        };
1374
208
        let subscription = RedisSubscription {
1375
208
            receiver: Some(receiver),
1376
208
            weak_subscribed_keys,
1377
208
        };
1378
208
        (publisher, subscription)
1379
208
    }
1380
1381
202
    fn subscribe(
1382
202
        &self,
1383
202
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
1384
202
    ) -> RedisSubscription {
1385
202
        let receiver = self.sender.lock().subscribe();
1386
202
        RedisSubscription {
1387
202
            receiver: Some(receiver),
1388
202
            weak_subscribed_keys,
1389
202
        }
1390
202
    }
1391
1392
408
    fn receiver_count(&self) -> usize {
1393
408
        self.sender.lock().receiver_count()
1394
408
    }
1395
1396
18
    fn notify(&self) {
1397
        // TODO(https://github.com/sile/patricia_tree/issues/40) When this is addressed
1398
        // we can remove the `Mutex` and use the mutable iterator directly.
1399
18
        self.sender.lock().send_modify(|_| {});
1400
18
    }
1401
}
1402
1403
#[derive(Debug, Clone)]
1404
pub struct RedisSubscriptionManager {
1405
    subscribed_keys: Arc<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
1406
    tx_for_test: UnboundedSender<String>,
1407
    _subscription_spawn: Arc<Mutex<JoinHandleDropGuard<()>>>,
1408
}
1409
1410
impl RedisSubscriptionManager {
1411
9
    pub fn new(subscriber_channel: UnboundedReceiver<PushInfo>) -> Self {
1412
9
        let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new()));
1413
9
        let subscribed_keys_weak = Arc::downgrade(&subscribed_keys);
1414
9
        let (tx_for_test, mut rx_for_test) = unbounded_channel();
1415
9
        let mut local_subscriber_channel = UnboundedReceiverStream::new(subscriber_channel);
1416
        Self {
1417
9
            subscribed_keys,
1418
9
            tx_for_test,
1419
9
            _subscription_spawn: Arc::new(Mutex::new(spawn!(
1420
                "redis_subscribe_spawn",
1421
7
                async move {
1422
                    loop {
1423
                        loop {
1424
23
                            let 
key15
= select! {
1425
23
                                
value14
= rx_for_test.recv() => {
1426
14
                                    let Some(value) = value else {
1427
0
                                        unreachable!("Channel should never close");
1428
                                    };
1429
14
                                    value
1430
                                },
1431
23
                                
maybe_push_info2
= local_subscriber_channel.next() => {
1432
2
                                    if let Some(push_info) = maybe_push_info {
1433
2
                                        match push_info.kind {
1434
1
                                            redis::PushKind::PMessage => {},
1435
                                            redis::PushKind::PSubscribe => {
1436
1
                                                trace!(?push_info, "PSubscribe, ignore");
1437
1
                                                continue;
1438
                                            }
1439
                                            _ => {
1440
0
                                                warn!(?push_info, "Other push_info message, discarded");
1441
0
                                                continue;
1442
                                            },
1443
                                        }
1444
1
                                        if push_info.data.len() != 3 {
1445
0
                                            error!(?push_info, "Expected exactly 3 values on subscriber channel (pattern, channel, value)");
1446
0
                                            continue;
1447
1
                                        }
1448
1
                                        match push_info.data.last().unwrap() {
1449
0
                                            Value::SimpleString(s) => {
1450
0
                                                s.clone()
1451
                                            }
1452
1
                                            Value::BulkString(v) => {
1453
1
                                                String::from_utf8(v.clone()).expect("String message")
1454
                                            }
1455
0
                                            other => {
1456
0
                                                error!(?other, "Received non-string message in RedisSubscriptionManager");
1457
0
                                                continue;
1458
                                            }
1459
                                        }
1460
                                    } else {
1461
0
                                        error!("Error receiving message in RedisSubscriptionManager from subscriber_channel");
1462
0
                                        break;
1463
                                    }
1464
                                }
1465
                            };
1466
15
                            trace!(key, "New subscription manager key");
1467
15
                            let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
1468
0
                                warn!(
1469
                                    "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"
1470
                                );
1471
0
                                return;
1472
                            };
1473
15
                            let subscribed_keys_mux = subscribed_keys.read();
1474
15
                            subscribed_keys_mux
1475
15
                                .common_prefix_values(&*key)
1476
15
                                .for_each(RedisSubscriptionPublisher::notify);
1477
                        }
1478
                        // Sleep for a small amount of time to ensure we don't reconnect too quickly.
1479
0
                        sleep(Duration::from_secs(1)).await;
1480
                        // If we reconnect or lag behind we might have had dirty keys, so we need to
1481
                        // flag all of them as changed.
1482
0
                        let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
1483
0
                            warn!(
1484
                                "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"
1485
                            );
1486
0
                            return;
1487
                        };
1488
0
                        let subscribed_keys_mux = subscribed_keys.read();
1489
                        // Just in case also get a new receiver.
1490
0
                        for publisher in subscribed_keys_mux.values() {
1491
0
                            publisher.notify();
1492
0
                        }
1493
                    }
1494
0
                }
1495
            ))),
1496
        }
1497
9
    }
1498
}
1499
1500
impl SubscriptionManagerNotify for RedisSubscriptionManager {
1501
16
    fn notify_for_test(&self, value: String) {
1502
16
        self.tx_for_test.send(value).unwrap();
1503
16
    }
1504
}
1505
1506
impl SchedulerSubscriptionManager for RedisSubscriptionManager {
1507
    type Subscription = RedisSubscription;
1508
1509
410
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
1510
410
    where
1511
410
        K: SchedulerStoreKeyProvider,
1512
    {
1513
410
        let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys);
1514
410
        let mut subscribed_keys = self.subscribed_keys.write();
1515
410
        let key = key.get_key();
1516
410
        let key_str = key.as_str();
1517
410
        let mut subscription = if let Some(
publisher202
) = subscribed_keys.get(&key_str) {
1518
202
            publisher.subscribe(weak_subscribed_keys)
1519
        } else {
1520
208
            let (publisher, subscription) =
1521
208
                RedisSubscriptionPublisher::new(key_str.to_string(), weak_subscribed_keys);
1522
208
            subscribed_keys.insert(key_str, publisher);
1523
208
            subscription
1524
        };
1525
410
        subscription
1526
410
            .receiver
1527
410
            .as_mut()
1528
410
            .ok_or_else(|| 
{0
1529
0
                make_err!(
1530
0
                    Code::Internal,
1531
                    "Receiver should be set in RedisSubscriptionManager::subscribe"
1532
                )
1533
0
            })?
1534
410
            .mark_changed();
1535
1536
410
        Ok(subscription)
1537
410
    }
1538
1539
1
    fn is_reliable() -> bool {
1540
1
        false
1541
1
    }
1542
}
1543
1544
impl<C, M> SchedulerStore for RedisStore<C, M>
1545
where
1546
    C: Clone + ConnectionLike + Sync + Send + 'static,
1547
    M: RedisManager<C> + Sync + Send + 'static,
1548
{
1549
    type SubscriptionManager = RedisSubscriptionManager;
1550
1551
6
    async fn subscription_manager(&self) -> Result<Arc<RedisSubscriptionManager>, Error> {
1552
6
        self.subscription_manager
1553
6
            .get_or_try_init(|| async move 
{3
1554
3
                let Some(subscriber_channel) = self.subscriber_channel.lock().take() else {
1555
0
                    return Err(make_input_err!(
1556
0
                        "Multiple attempts to obtain the subscription manager in RedisStore"
1557
0
                    ));
1558
                };
1559
3
                let Some(pub_sub_channel) = &self.pub_sub_channel else {
1560
0
                    return Err(make_input_err!(
1561
0
                        "RedisStore must have a pubsub for Redis Scheduler if using subscriptions"
1562
0
                    ));
1563
                };
1564
3
                self.connection_manager.psubscribe(pub_sub_channel).await
?0
;
1565
3
                Ok(Arc::new(RedisSubscriptionManager::new(subscriber_channel)))
1566
6
            })
1567
6
            .await
1568
6
            .map(Clone::clone)
1569
6
    }
1570
1571
24
    async fn update_data<T>(&self, data: T, expiry: Option<Duration>) -> Result<Option<i64>, Error>
1572
24
    where
1573
24
        T: SchedulerStoreDataProvider
1574
24
            + SchedulerStoreKeyProvider
1575
24
            + SchedulerCurrentVersionProvider
1576
24
            + Send,
1577
24
    {
1578
24
        let key = data.get_key();
1579
24
        let redis_key = self.encode_key(&key);
1580
24
        let mut client = self.get_client().await
?0
;
1581
24
        let maybe_index = data.get_indexes().err_tip(|| 
{0
1582
0
            format!("Err getting index in RedisStore::update_data::versioned for {redis_key}")
1583
0
        })?;
1584
24
        if <T as SchedulerStoreKeyProvider>::Versioned::VALUE {
1585
18
            let current_version = data.current_version();
1586
18
            let data = data.try_into_bytes().err_tip(|| 
{0
1587
0
                format!("Could not convert value to bytes in RedisStore::update_data::versioned for {redis_key}")
1588
0
            })?;
1589
18
            let mut script = self.connection_manager.update_script(redis_key.as_ref());
1590
18
            let mut script_invocation = script
1591
18
                .arg(format!("{current_version}"))
1592
18
                .arg(expiry.unwrap_or(Duration::ZERO).as_secs())
1593
18
                .arg(data.to_vec());
1594
52
            for (name, value) in 
maybe_index18
{
1595
52
                script_invocation = script_invocation.arg(name).arg(value.to_vec());
1596
52
            }
1597
18
            let start = Instant::now();
1598
18
            let (success, new_version): (bool, i64) = match script_invocation
1599
18
                .invoke_async(&mut client.connection_manager)
1600
18
                .await
1601
            {
1602
18
                Ok(v) => v,
1603
0
                Err(err)
1604
0
                    if err.kind() == redis::ErrorKind::Server(redis::ServerErrorKind::ReadOnly) =>
1605
                {
1606
0
                    client.reconnect(&self.connection_manager).await?;
1607
0
                    script_invocation
1608
0
                        .invoke_async(&mut client.connection_manager)
1609
0
                        .await
1610
0
                        .err_tip(|| format!("(after reconnect) In RedisStore::update_data::versioned for {key:?}"))?
1611
                }
1612
0
                Err(err) => {
1613
0
                    let mut error: Error = err.into();
1614
0
                    error
1615
0
                        .messages
1616
0
                        .push(format!("In RedisStore::update_data::versioned for {key:?}"));
1617
0
                    return Err(error);
1618
                }
1619
            };
1620
1621
18
            let elapsed = start.elapsed();
1622
1623
18
            if elapsed > Duration::from_millis(100) {
1624
0
                warn!(
1625
                    %redis_key,
1626
                    ?elapsed,
1627
                    "Slow Redis version-set operation"
1628
                );
1629
18
            }
1630
18
            if !success {
1631
4
                warn!(
1632
                    %redis_key,
1633
                    %key,
1634
                    %current_version,
1635
                    %new_version,
1636
4
                    caller = core::any::type_name::<T>(),
1637
                    "Redis version conflict - optimistic lock failed"
1638
                );
1639
4
                return Ok(None);
1640
14
            }
1641
14
            trace!(
1642
                %redis_key,
1643
                %key,
1644
                old_version = %current_version,
1645
                %new_version,
1646
                "Updated redis key to new version"
1647
            );
1648
            // If we have a publish channel configured, send a notice that the key has been set.
1649
14
            if let Some(
pub_sub_channel12
) = &self.pub_sub_channel {
1650
12
                return Ok(client
1651
12
                    .connection_manager
1652
12
                    .publish(pub_sub_channel, redis_key.as_ref())
1653
12
                    .await
?0
);
1654
2
            }
1655
2
            Ok(Some(new_version))
1656
        } else {
1657
6
            let data = data.try_into_bytes().err_tip(|| 
{0
1658
0
                format!("Could not convert value to bytes in RedisStore::update_data::noversion for {redis_key}")
1659
0
            })?;
1660
6
            let mut fields: Vec<(String, _)> = vec![];
1661
6
            fields.push((DATA_FIELD_NAME.into(), data.to_vec()));
1662
6
            for (name, value) in maybe_index {
1663
6
                fields.push((name.into(), value.to_vec()));
1664
6
            }
1665
6
            match client
1666
6
                .connection_manager
1667
6
                .hset_multiple::<_, _, _, ()>(redis_key.as_ref(), &fields)
1668
6
                .await
1669
            {
1670
5
                Ok(_v) => {
1671
5
                    if let Some(
expiry_v2
) = expiry {
1672
2
                        let seconds =
1673
2
                            TryInto::<i64>::try_into(expiry_v.as_secs()).err_tip(|| 
{0
1674
0
                                format!("Expiry seconds doesn't map to i64: {expiry_v:#?}")
1675
0
                            })?;
1676
2
                        let expiry_result: u8 = client
1677
2
                            .connection_manager
1678
2
                            .expire(redis_key.as_ref(), seconds)
1679
2
                            .await
1680
2
                            .err_tip(|| 
{0
1681
0
                                format!(
1682
                                    "In RedisStore::update_data::noversion (expiry) for {redis_key}"
1683
                                )
1684
0
                            })?;
1685
2
                        if expiry_result != 1 {
1686
1
                            warn!(%redis_key, seconds, "Wasn't able to set expiry for Redis key");
1687
1
                        }
1688
3
                    }
1689
                }
1690
1
                Err(err)
1691
1
                    if err.kind() == redis::ErrorKind::Server(redis::ServerErrorKind::ReadOnly) =>
1692
                {
1693
1
                    client.reconnect(&self.connection_manager).await
?0
;
1694
1
                    client
1695
1
                        .connection_manager
1696
1
                        .hset_multiple::<_, _, _, ()>(redis_key.as_ref(), &fields)
1697
1
                        .await
1698
1
                        .err_tip(|| 
format!0
("(after reconnect) In RedisStore::update_data::noversion (hset) for {redis_key}"))
?0
;
1699
1
                    if let Some(expiry_v) = expiry {
1700
1
                        let seconds =
1701
1
                            TryInto::<i64>::try_into(expiry_v.as_secs()).err_tip(|| 
{0
1702
0
                                format!("Expiry seconds doesn't map to i64: {expiry_v:#?}")
1703
0
                            })?;
1704
1
                        let expiry_result: u8 = client.connection_manager.expire(redis_key.as_ref(), seconds).await
1705
1
                        .err_tip(|| 
format!0
("(after reconnect) In RedisStore::update_data::noversion (expiry) for {redis_key}"))
?0
;
1706
1
                        if expiry_result != 1 {
1707
0
                            warn!(%redis_key, seconds, "Wasn't able to set expiry for Redis key");
1708
1
                        }
1709
0
                    }
1710
                }
1711
0
                Err(err) => {
1712
0
                    let mut error: Error = err.into();
1713
0
                    error.messages.push(format!(
1714
                        "In RedisStore::update_data::noversion for {redis_key}"
1715
                    ));
1716
0
                    return Err(error);
1717
                }
1718
            }
1719
            // If we have a publish channel configured, send a notice that the key has been set.
1720
6
            if let Some(
pub_sub_channel3
) = &self.pub_sub_channel {
1721
3
                return Ok(client
1722
3
                    .connection_manager
1723
3
                    .publish(pub_sub_channel, redis_key.as_ref())
1724
3
                    .await
?0
);
1725
3
            }
1726
3
            Ok(Some(0)) // Always use "0" version since this is not a versioned request.
1727
        }
1728
24
    }
1729
1730
28
    async fn search_by_index_prefix<K>(
1731
28
        &self,
1732
28
        index: K,
1733
28
    ) -> Result<
1734
28
        impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
1735
28
        Error,
1736
28
    >
1737
28
    where
1738
28
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,
1739
28
    {
1740
28
        let index_value = index.index_value();
1741
28
        try_sanitize(index_value.as_ref())
1742
28
            .then_some(())
1743
28
            .err_tip(|| 
{0
1744
0
                format!("In RedisStore::search_by_index_prefix::try_sanitize - {index_value:?}")
1745
0
            })?;
1746
31
        let 
run_ft_aggregate28
= |connection_manager: C| async {
1747
31
            ft_aggregate(
1748
31
                connection_manager,
1749
31
                format!(
1750
                    "{}",
1751
31
                    get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1752
                ),
1753
31
                if index_value.is_empty() {
1754
2
                    "*".to_string()
1755
                } else {
1756
29
                    format!("@{}:{{ {} }}", K::INDEX_NAME, index_value)
1757
                },
1758
                FtAggregateOptions {
1759
31
                    load: vec![DATA_FIELD_NAME.into(), VERSION_FIELD_NAME.into()],
1760
31
                    cursor: FtAggregateCursor {
1761
31
                        count: self.max_count_per_cursor,
1762
31
                        max_idle: CURSOR_IDLE_MS,
1763
31
                    },
1764
31
                    sort_by: K::MAYBE_SORT_KEY.map_or_else(Vec::new, |v| 
vec!27
[
format!27
("@{v}")]),
1765
                },
1766
            )
1767
31
            .await
1768
62
        };
1769
28
        let run_ft_create = |connection_manager: C| async 
{3
1770
3
            let mut schema = vec![SearchSchema {
1771
3
                field_name: K::INDEX_NAME.into(),
1772
3
                sortable: false,
1773
3
            }];
1774
3
            if let Some(sort_key) = K::MAYBE_SORT_KEY {
1775
3
                schema.push(SearchSchema {
1776
3
                    field_name: sort_key.into(),
1777
3
                    sortable: true,
1778
3
                });
1779
3
            
}0
1780
3
            let create_options = FtCreateOptions {
1781
3
                prefixes: vec![K::KEY_PREFIX.into()],
1782
3
                nohl: true,
1783
3
                nofields: true,
1784
3
                nofreqs: true,
1785
3
                nooffsets: true,
1786
3
                temporary: Some(INDEX_TTL_S),
1787
3
            };
1788
3
            let index = format!(
1789
                "{}",
1790
3
                get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1791
            );
1792
3
            ft_create(connection_manager, index, create_options, schema).await
1793
6
        };
1794
1795
28
        let (connection_manager, connect_id) = self.connection_manager.get_connection().await
?0
;
1796
28
        let 
stream25
= match run_ft_aggregate(connection_manager.clone()).await {
1797
0
            Err(err)
1798
3
                if err.kind() == redis::ErrorKind::Server(redis::ServerErrorKind::ReadOnly
)0
=>
1799
            {
1800
0
                let (connection_manager, _connect_id) =
1801
0
                    self.connection_manager.reconnect(connect_id).await?;
1802
0
                run_ft_aggregate(connection_manager).await.err_tip(|| {
1803
0
                    format!(
1804
                        "Error with reconnected ft_aggregate in RedisStore::search_by_index_prefix({})",
1805
0
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1806
                    )
1807
0
                })
1808
            }
1809
            Err(_) => {
1810
3
                let (connection_manager, result) =
1811
3
                    match run_ft_create(connection_manager.clone()).await {
1812
0
                        Err(err)
1813
3
                            if err.kind()
1814
0
                                == redis::ErrorKind::Server(redis::ServerErrorKind::ReadOnly) =>
1815
                        {
1816
0
                            let (connection_manager, _connect_id) =
1817
0
                                self.connection_manager.reconnect(connect_id).await?;
1818
                            (
1819
0
                                connection_manager.clone(),
1820
0
                                run_ft_create(connection_manager).await,
1821
                            )
1822
                        }
1823
3
                        result => (connection_manager, result),
1824
                    };
1825
1826
                // RediSearch returns ErrorKind::Extension with code "Index"
1827
                // and detail along the lines of "Index already exists" when
1828
                // FT.CREATE races with another node.
1829
3
                let create_result = result.or_else(|e| {
1830
3
                    let is_already_exists = e.kind() == redis::ErrorKind::Extension
1831
2
                        && e.code() == Some("Index")
1832
1
                        && e.detail()
1833
1
                            .is_some_and(|d| d.to_ascii_lowercase().contains("already exists"));
1834
3
                    if is_already_exists {
1835
1
                        Ok(())
1836
                    } else {
1837
2
                        Err(e).err_tip(|| {
1838
2
                            format!(
1839
                                "Error with ft_create in RedisStore::search_by_index_prefix({})",
1840
2
                                get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1841
                            )
1842
2
                        })
1843
                    }
1844
3
                });
1845
1846
3
                let run_result = run_ft_aggregate(connection_manager).await.err_tip(|| {
1847
3
                    format!(
1848
                        "Error with second ft_aggregate in RedisStore::search_by_index_prefix({})",
1849
3
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1850
                    )
1851
3
                });
1852
1853
                // Creating the index will race which is ok. If it fails to create, we only
1854
                // error if the second ft_aggregate call fails and fails to create.
1855
3
                run_result.or_else(move |e| create_result.merge(Err(e)))
1856
            }
1857
25
            Ok(stream) => Ok(stream),
1858
3
        }?;
1859
1860
25
        Ok(stream.filter_map(|result| async move 
{16
1861
16
            let raw_redis_map = match result {
1862
16
                Ok(v) => v,
1863
0
                Err(e) => {
1864
                    return Some(
1865
0
                        Err(Error::from(e))
1866
0
                            .err_tip(|| "Error in stream of in RedisStore::search_by_index_prefix"),
1867
                    );
1868
                }
1869
            };
1870
1871
16
            if 
matches!15
(raw_redis_map, Value::Int(_)) {
1872
1
                return None;
1873
15
            }
1874
1875
15
            let Some(redis_map) = raw_redis_map.as_sequence() else {
1876
0
                return Some(Err(Error::new(
1877
0
                    Code::Internal,
1878
0
                    format!("Non-array from ft_aggregate: {raw_redis_map:?}"),
1879
0
                )));
1880
            };
1881
15
            let mut redis_map_iter = redis_map.iter();
1882
15
            let mut bytes_data: Option<Bytes> = None;
1883
15
            let mut version: Option<i64> = None;
1884
46
            while let Some(
key31
) = redis_map_iter.next() {
1885
31
                let value = redis_map_iter.next().unwrap();
1886
31
                let Value::BulkString(k) = key else {
1887
0
                    return Some(Err(Error::new(
1888
0
                        Code::Internal,
1889
0
                        format!("Non-BulkString key from ft_aggregate: {key:?}"),
1890
0
                    )));
1891
                };
1892
31
                let Ok(str_key) = str::from_utf8(k) else {
1893
0
                    return Some(Err(Error::new(
1894
0
                        Code::Internal,
1895
0
                        format!("Non-utf8 key from ft_aggregate: {key:?}"),
1896
0
                    )));
1897
                };
1898
31
                let Value::BulkString(v) = value else {
1899
0
                    return Some(Err(Error::new(
1900
0
                        Code::Internal,
1901
0
                        format!("Non-BulkString value from ft_aggregate: {key:?}"),
1902
0
                    )));
1903
                };
1904
31
                match str_key {
1905
31
                    DATA_FIELD_NAME => {
1906
15
                        bytes_data = Some(v.clone().into());
1907
15
                    }
1908
16
                    VERSION_FIELD_NAME => {
1909
15
                        let Ok(str_v) = str::from_utf8(v) else {
1910
0
                            return Some(Err(Error::new(
1911
0
                                Code::Internal,
1912
0
                                format!("Non-utf8 version value from ft_aggregate: {v:?}"),
1913
0
                            )));
1914
                        };
1915
15
                        let Ok(raw_version) = str_v.parse::<i64>() else {
1916
0
                            return Some(Err(Error::new(
1917
0
                                Code::Internal,
1918
0
                                format!("Non-integer version value from ft_aggregate: {str_v:?}"),
1919
0
                            )));
1920
                        };
1921
15
                        version = Some(raw_version);
1922
                    }
1923
1
                    other => {
1924
1
                        if K::MAYBE_SORT_KEY == Some(other) {
1925
1
                            // ignore sort keys
1926
1
                        } else {
1927
0
                            return Some(Err(Error::new(
1928
0
                                Code::Internal,
1929
0
                                format!("Extra keys from ft_aggregate: {other}"),
1930
0
                            )));
1931
                        }
1932
                    }
1933
                }
1934
            }
1935
15
            let Some(found_bytes_data) = bytes_data else {
1936
0
                return Some(Err(Error::new(
1937
0
                    Code::Internal,
1938
0
                    format!("Missing '{DATA_FIELD_NAME}' in ft_aggregate, got: {raw_redis_map:?}"),
1939
0
                )));
1940
            };
1941
            Some(
1942
15
                K::decode(version.unwrap_or(0), found_bytes_data)
1943
15
                    .err_tip(|| "In RedisStore::search_by_index_prefix::decode"),
1944
            )
1945
32
        }))
1946
28
    }
1947
1948
94
    async fn get_and_decode<K>(
1949
94
        &self,
1950
94
        key: K,
1951
94
    ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>
1952
94
    where
1953
94
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send,
1954
94
    {
1955
94
        let key = key.get_key();
1956
94
        let key = self.encode_key(&key);
1957
94
        let mut client = self.get_client().await
?0
;
1958
94
        let results: Vec<Value> = client
1959
94
            .connection_manager
1960
94
            .hmget::<_, Vec<String>, Vec<Value>>(
1961
94
                key.as_ref(),
1962
94
                vec![VERSION_FIELD_NAME.into(), DATA_FIELD_NAME.into()],
1963
94
            )
1964
94
            .await
1965
94
            .err_tip(|| 
format!0
("In RedisStore::get_without_version::notversioned {key}"))
?0
;
1966
94
        let Some(Value::BulkString(
data48
)) = results.get(1) else {
1967
46
            return Ok(None);
1968
        };
1969
        #[allow(clippy::get_first)]
1970
48
        let version = if let Some(raw_v) = results.get(0) {
1971
48
            match raw_v {
1972
0
                Value::Int(v) => *v,
1973
46
                Value::BulkString(v) => i64::from_str(str::from_utf8(v).expect("utf-8 bulkstring"))
1974
46
                    .expect("integer bulkstring"),
1975
2
                Value::Nil => 0,
1976
                _ => {
1977
0
                    warn!(?raw_v, "Non-integer version!");
1978
0
                    0
1979
                }
1980
            }
1981
        } else {
1982
0
            0
1983
        };
1984
        Ok(Some(
1985
48
            K::decode(version, Bytes::from(data.clone())).err_tip(|| 
{0
1986
0
                format!("In RedisStore::get_with_version::notversioned::decode {key}")
1987
0
            })?,
1988
        ))
1989
94
    }
1990
}