Coverage Report

Created: 2026-04-16 01:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/redis_store.rs
Line
Count
Source
1
// Copyright 2024-2026 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::fmt::Debug;
17
use core::marker::PhantomData;
18
use core::ops::{Bound, RangeBounds};
19
use core::pin::Pin;
20
use core::str::FromStr;
21
use core::time::Duration;
22
use std::borrow::Cow;
23
use std::collections::HashSet;
24
use std::sync::{Arc, Weak};
25
use std::time::Instant;
26
27
use async_trait::async_trait;
28
use bytes::Bytes;
29
use const_format::formatcp;
30
use futures::stream::FuturesUnordered;
31
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt, future};
32
use itertools::izip;
33
use nativelink_config::stores::{RedisMode, RedisSpec};
34
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
35
use nativelink_metric::MetricsComponent;
36
use nativelink_redis_tester::SubscriptionManagerNotify;
37
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
38
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
39
use nativelink_util::spawn;
40
use nativelink_util::store_trait::{
41
    BoolValue, RemoveItemCallback, SchedulerCurrentVersionProvider, SchedulerIndexProvider,
42
    SchedulerStore, SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,
43
    SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo,
44
};
45
use nativelink_util::task::JoinHandleDropGuard;
46
use parking_lot::{Mutex, RwLock};
47
use patricia_tree::StringPatriciaMap;
48
use redis::aio::{ConnectionLike, ConnectionManager, ConnectionManagerConfig};
49
use redis::cluster::ClusterClient;
50
use redis::cluster_async::ClusterConnection;
51
use redis::sentinel::{SentinelClient, SentinelNodeConnectionInfo, SentinelServerType};
52
use redis::{
53
    AsyncCommands, AsyncIter, Client, IntoConnectionInfo, PushInfo, ScanOptions, Script, Value,
54
    pipe,
55
};
56
use tokio::select;
57
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
58
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
59
use tokio::time::{sleep, timeout};
60
use tokio_stream::wrappers::UnboundedReceiverStream;
61
use tracing::{debug, error, info, trace, warn};
62
use url::Url;
63
use uuid::Uuid;
64
65
use crate::cas_utils::is_zero_digest;
66
use crate::redis_utils::{
67
    FtAggregateCursor, FtAggregateOptions, FtCreateOptions, SearchSchema, ft_aggregate, ft_create,
68
};
69
70
/// The default size of the read chunk when reading data from Redis.
71
/// Note: If this changes it should be updated in the config documentation.
72
const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024;
73
74
/// The default size of the connection pool if not specified.
75
/// Note: If this changes it should be updated in the config documentation.
76
const DEFAULT_CONNECTION_POOL_SIZE: usize = 3;
77
78
/// The default delay between retries if not specified.
79
/// Note: If this changes it should be updated in the config documentation.
80
const DEFAULT_RETRY_DELAY: f32 = 0.1;
81
82
/// The default connection timeout in milliseconds if not specified.
83
/// Note: If this changes it should be updated in the config documentation.
84
const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000;
85
86
/// The default command timeout in milliseconds if not specified.
87
/// Note: If this changes it should be updated in the config documentation.
88
const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000;
89
90
/// The default maximum number of chunk uploads per update.
91
/// Note: If this changes it should be updated in the config documentation.
92
pub const DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE: usize = 10;
93
94
/// The default COUNT value passed when scanning keys in Redis.
95
/// Note: If this changes it should be updated in the config documentation.
96
const DEFAULT_SCAN_COUNT: usize = 10_000;
97
98
/// The default COUNT value passed when scanning search indexes
99
/// Note: If this changes it should be updated in the config documentation.
100
pub const DEFAULT_MAX_COUNT_PER_CURSOR: u64 = 1_500;
101
102
const DEFAULT_CLIENT_PERMITS: usize = 500;
103
104
/// A wrapper around Redis to allow it to be reconnected.
105
pub trait RedisManager<C>
106
where
107
    C: ConnectionLike + Clone,
108
{
109
    /// Get a connection manager and a unique identifier for this connection
110
    /// which may be used to issue a reconnect later.
111
    fn get_connection(&self) -> impl Future<Output = Result<(C, Uuid), Error>> + Send;
112
113
    /// Reconnect if the uuid matches the uuid returned from `get_connection()`.
114
    fn reconnect(&self, uuid: Uuid) -> impl Future<Output = Result<(C, Uuid), Error>> + Send;
115
116
    /// Get an invocation of the update version script for a given `key`.
117
    fn update_script(&self, key: &str) -> redis::ScriptInvocation<'_>;
118
119
    /// Configure the connection to have a psubscribe on it and perform the
120
    /// subscription on reconnect.
121
    fn psubscribe(&self, pattern: &str) -> impl Future<Output = Result<(), Error>> + Send;
122
}
123
124
#[derive(Debug)]
125
pub struct ClusterRedisManager<C>
126
where
127
    C: ConnectionLike + Clone,
128
{
129
    /// A constant Uuid, we never reconnect.
130
    uuid: Uuid,
131
132
    /// Redis script used to update a value in redis if the version matches.
133
    /// This is done by incrementing the version number and then setting the new
134
    /// data only if the version number matches the existing version number.
135
    update_if_version_matches_script: Script,
136
137
    /// The client pool connecting to the backing Redis instance(s).
138
    connection_manager: C,
139
}
140
141
impl<C> ClusterRedisManager<C>
142
where
143
    C: ConnectionLike + Clone,
144
{
145
13
    pub async fn new(mut connection_manager: C) -> Result<Self, Error> {
146
13
        let update_if_version_matches_script = Script::new(LUA_VERSION_SET_SCRIPT);
147
13
        update_if_version_matches_script
148
13
            .load_async(&mut connection_manager)
149
13
            .await
?0
;
150
13
        Ok(Self {
151
13
            uuid: Uuid::new_v4(),
152
13
            update_if_version_matches_script,
153
13
            connection_manager,
154
13
        })
155
13
    }
156
}
157
158
impl<C> RedisManager<C> for ClusterRedisManager<C>
159
where
160
    C: ConnectionLike + Clone + Send + Sync,
161
{
162
31
    fn get_connection(&self) -> impl Future<Output = Result<(C, Uuid), Error>> + Send {
163
31
        future::ready(Ok((self.connection_manager.clone(), self.uuid)))
164
31
    }
165
166
0
    fn reconnect(&self, _uuid: Uuid) -> impl Future<Output = Result<(C, Uuid), Error>> + Send {
167
0
        self.get_connection()
168
0
    }
169
170
0
    fn update_script(&self, key: &str) -> redis::ScriptInvocation<'_> {
171
0
        self.update_if_version_matches_script.key(key)
172
0
    }
173
174
0
    fn psubscribe(&self, _pattern: &str) -> impl Future<Output = Result<(), Error>> + Send {
175
        // This is a no-op for cluster connections.
176
0
        future::ready(Ok(()))
177
0
    }
178
}
179
180
type RedisConnectFuture<C> = dyn Future<Output = Result<C, Error>> + Send;
181
type RedisConnectFn<C> = dyn Fn() -> Pin<Box<RedisConnectFuture<C>>> + Send + Sync;
182
183
pub struct StandardRedisManager<C>
184
where
185
    C: ConnectionLike + Clone,
186
{
187
    /// Function used to re-connect to Redis.
188
    connect_func: Box<RedisConnectFn<C>>,
189
190
    /// Redis script used to update a value in redis if the version matches.
191
    /// This is done by incrementing the version number and then setting the new
192
    /// data only if the version number matches the existing version number.
193
    update_if_version_matches_script: Script,
194
195
    /// The client pool connecting to the backing Redis instance(s) and a Uuid
196
    /// for this connection in order to avoid multiple reconnection attempts.
197
    connection_manager: tokio::sync::RwLock<(C, Uuid)>,
198
199
    /// A list of subscription that should be performed on reconnect.
200
    subscriptions: Mutex<HashSet<String>>,
201
}
202
203
impl<C> Debug for StandardRedisManager<C>
204
where
205
    C: ConnectionLike + Clone,
206
{
207
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
208
0
        f.debug_struct("StandardRedisManager")
209
0
            .field(
210
0
                "update_if_version_matches_script",
211
0
                &self.update_if_version_matches_script,
212
0
            )
213
0
            .field("subscriptions", &self.subscriptions)
214
0
            .finish()
215
0
    }
216
}
217
218
impl<C> StandardRedisManager<C>
219
where
220
    C: ConnectionLike + Clone + Send + Sync,
221
{
222
14
    async fn configure(&self, connection_manager: &mut C) -> Result<(), Error> {
223
14
        self.update_if_version_matches_script
224
14
            .load_async(connection_manager)
225
14
            .await
?0
;
226
14
        Ok(())
227
14
    }
228
229
15
    async fn new(connect_func: Box<RedisConnectFn<C>>) -> Result<Self, Error> {
230
15
        let 
connection_manager12
= connect_func().await
?3
;
231
12
        let update_if_version_matches_script = Script::new(LUA_VERSION_SET_SCRIPT);
232
12
        let connection = Self {
233
12
            connect_func,
234
12
            update_if_version_matches_script,
235
12
            connection_manager: tokio::sync::RwLock::new((connection_manager, Uuid::new_v4())),
236
12
            subscriptions: Mutex::new(HashSet::new()),
237
12
        };
238
        {
239
12
            let mut connection_manager = connection.connection_manager.write().await;
240
12
            connection.configure(&mut connection_manager.0).await
?0
;
241
        }
242
12
        Ok(connection)
243
15
    }
244
}
245
246
impl RedisManager<ConnectionManager> for StandardRedisManager<ConnectionManager> {
247
142
    async fn get_connection(&self) -> Result<(ConnectionManager, Uuid), Error> {
248
142
        Ok(self.connection_manager.read().await.clone())
249
142
    }
250
251
2
    async fn reconnect(&self, uuid: Uuid) -> Result<(ConnectionManager, Uuid), Error> {
252
2
        let mut guard = self.connection_manager.write().await;
253
2
        if guard.1 != uuid {
254
0
            let connection = guard.clone();
255
0
            drop(guard);
256
0
            return Ok(connection);
257
2
        }
258
2
        let mut connection_manager = (self.connect_func)().await
?0
;
259
2
        let uuid = Uuid::new_v4();
260
2
        self.configure(&mut connection_manager).await
?0
;
261
2
        let subscriptions = {
262
2
            let guard = self.subscriptions.lock();
263
2
            guard.iter().map(Clone::clone).collect::<Vec<_>>()
264
        };
265
2
        for 
subscription0
in subscriptions {
266
0
            connection_manager.psubscribe(&subscription).await?;
267
        }
268
2
        *guard = (connection_manager.clone(), uuid);
269
2
        Ok((connection_manager, uuid))
270
2
    }
271
272
17
    fn update_script(&self, key: &str) -> redis::ScriptInvocation<'_> {
273
17
        self.update_if_version_matches_script.key(key)
274
17
    }
275
276
3
    async fn psubscribe(&self, pattern: &str) -> Result<(), Error> {
277
3
        let mut connection = self.get_connection().await
?0
.0;
278
3
        let new_subscription = self.subscriptions.lock().insert(String::from(pattern));
279
3
        if new_subscription {
280
3
            let result = connection.psubscribe(pattern).await;
281
3
            if result.is_err() {
282
0
                self.subscriptions.lock().remove(pattern);
283
3
            }
284
3
            result
?0
;
285
0
        }
286
3
        Ok(())
287
3
    }
288
}
289
290
/// A [`StoreDriver`] implementation that uses Redis as a backing store.
291
#[derive(MetricsComponent)]
292
pub struct RedisStore<C, M>
293
where
294
    C: ConnectionLike + Clone,
295
    M: RedisManager<C>,
296
{
297
    /// The client pool connecting to the backing Redis instance(s).
298
    connection_manager: M,
299
300
    /// The underlying connection type in the connection manager.
301
    _connection_type: PhantomData<C>,
302
303
    /// A channel to publish updates to when a key is added, removed, or modified.
304
    #[metric(
305
        help = "The pubsub channel to publish updates to when a key is added, removed, or modified"
306
    )]
307
    pub_sub_channel: Option<String>,
308
309
    /// A function used to generate names for temporary keys.
310
    temp_name_generator_fn: fn() -> String,
311
312
    /// A common prefix to append to all keys before they are sent to Redis.
313
    ///
314
    /// See [`RedisStore::key_prefix`](`nativelink_config::stores::RedisStore::key_prefix`).
315
    #[metric(help = "Prefix to append to all keys before sending to Redis")]
316
    key_prefix: String,
317
318
    /// The amount of data to read from Redis at a time.
319
    #[metric(help = "The amount of data to read from Redis at a time")]
320
    read_chunk_size: usize,
321
322
    /// The maximum number of chunk uploads per update.
323
    /// This is used to limit the number of chunk uploads per update to prevent
324
    /// overloading when uploading large blocks of data
325
    #[metric(help = "The maximum number of chunk uploads per update")]
326
    max_chunk_uploads_per_update: usize,
327
328
    /// The COUNT value passed when scanning keys in Redis.
329
    /// This is used to hint the amount of work that should be done per response.
330
    #[metric(help = "The COUNT value passed when scanning keys in Redis")]
331
    scan_count: usize,
332
333
    /// The COUNT value used with search indexes
334
    #[metric(help = "The maximum number of results to return per cursor")]
335
    max_count_per_cursor: u64,
336
337
    /// A manager for subscriptions to keys in Redis.
338
    subscription_manager: tokio::sync::OnceCell<Arc<RedisSubscriptionManager>>,
339
340
    /// Channel for getting subscription messages
341
    subscriber_channel: Mutex<Option<UnboundedReceiver<PushInfo>>>,
342
343
    /// Permits to limit inflight Redis requests. Technically only
344
    /// limits the calls to `get_client()`, but the requests per client
345
    /// are small enough that it works well enough.
346
    client_permits: Arc<Semaphore>,
347
}
348
349
impl<C, M> Debug for RedisStore<C, M>
350
where
351
    C: ConnectionLike + Clone,
352
    M: RedisManager<C>,
353
{
354
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
355
0
        f.debug_struct("RedisStore")
356
0
            .field("temp_name_generator_fn", &self.temp_name_generator_fn)
357
0
            .field("key_prefix", &self.key_prefix)
358
0
            .field("read_chunk_size", &self.read_chunk_size)
359
0
            .field(
360
0
                "max_chunk_uploads_per_update",
361
0
                &self.max_chunk_uploads_per_update,
362
0
            )
363
0
            .field("scan_count", &self.scan_count)
364
0
            .field("subscription_manager", &self.subscription_manager)
365
0
            .field("subscriber_channel", &self.subscriber_channel)
366
0
            .field("client_permits", &self.client_permits)
367
0
            .finish()
368
0
    }
369
}
370
371
struct ClientWithPermit<C: ConnectionLike> {
372
    connection_manager: C,
373
    uuid: Uuid,
374
375
    // here so it sticks around with the client and doesn't get dropped until that does
376
    #[allow(dead_code)]
377
    semaphore_permit: OwnedSemaphorePermit,
378
}
379
380
impl<C: ConnectionLike + Clone> ClientWithPermit<C> {
381
1
    async fn reconnect<M: RedisManager<C> + Sync>(&mut self, manager: &M) -> Result<(), Error> {
382
1
        (self.connection_manager, self.uuid) = manager.reconnect(self.uuid).await
?0
;
383
1
        Ok(())
384
1
    }
385
}
386
387
impl<C: ConnectionLike> Drop for ClientWithPermit<C> {
388
139
    fn drop(&mut self) {
389
139
        trace!(
390
139
            remaining = self.semaphore_permit.semaphore().available_permits(),
391
            "Dropping a client permit"
392
        );
393
139
    }
394
}
395
396
impl<C, M> RedisStore<C, M>
397
where
398
    C: ConnectionLike + Clone + Sync,
399
    M: RedisManager<C> + Sync,
400
{
401
    /// Used for testing when determinism is required.
402
    #[expect(clippy::too_many_arguments)]
403
25
    pub async fn new_from_builder_and_parts(
404
25
        pub_sub_channel: Option<String>,
405
25
        temp_name_generator_fn: fn() -> String,
406
25
        key_prefix: String,
407
25
        read_chunk_size: usize,
408
25
        max_chunk_uploads_per_update: usize,
409
25
        scan_count: usize,
410
25
        max_client_permits: usize,
411
25
        max_count_per_cursor: u64,
412
25
        subscriber_channel: UnboundedReceiver<PushInfo>,
413
25
        connection_manager: M,
414
25
    ) -> Result<Self, Error> {
415
25
        info!("Redis index fingerprint: {FINGERPRINT_CREATE_INDEX_HEX}");
416
417
25
        Ok(Self {
418
25
            connection_manager,
419
25
            _connection_type: PhantomData,
420
25
            pub_sub_channel,
421
25
            temp_name_generator_fn,
422
25
            key_prefix,
423
25
            read_chunk_size,
424
25
            max_chunk_uploads_per_update,
425
25
            scan_count,
426
25
            subscription_manager: tokio::sync::OnceCell::new(),
427
25
            subscriber_channel: Mutex::new(Some(subscriber_channel)),
428
25
            client_permits: Arc::new(Semaphore::new(max_client_permits)),
429
25
            max_count_per_cursor,
430
25
        })
431
25
    }
432
433
139
    async fn get_client(&self) -> Result<ClientWithPermit<C>, Error> {
434
139
        let local_client_permits = self.client_permits.clone();
435
139
        let remaining = local_client_permits.available_permits();
436
139
        let semaphore_permit = local_client_permits.acquire_owned().await
?0
;
437
139
        trace!(remaining, "Got a client permit");
438
139
        let (connection_manager, uuid) = self.connection_manager.get_connection().await
?0
;
439
139
        Ok(ClientWithPermit {
440
139
            connection_manager,
441
139
            uuid,
442
139
            semaphore_permit,
443
139
        })
444
139
    }
445
446
    /// Encode a [`StoreKey`] so it can be sent to Redis.
447
133
    fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> {
448
133
        let key_body = key.as_str();
449
133
        if self.key_prefix.is_empty() {
450
129
            key_body
451
        } else {
452
            // This is in the hot path for all redis operations, so we try to reuse the allocation
453
            // from `key.as_str()` if possible.
454
4
            match key_body {
455
4
                Cow::Owned(mut encoded_key) => {
456
4
                    encoded_key.insert_str(0, &self.key_prefix);
457
4
                    Cow::Owned(encoded_key)
458
                }
459
0
                Cow::Borrowed(body) => {
460
0
                    let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len());
461
0
                    encoded_key.push_str(&self.key_prefix);
462
0
                    encoded_key.push_str(body);
463
0
                    Cow::Owned(encoded_key)
464
                }
465
            }
466
        }
467
133
    }
468
469
15
    fn set_spec_defaults(spec: &mut RedisSpec) -> Result<(), Error> {
470
15
        if spec.addresses.is_empty() {
471
0
            return Err(make_err!(
472
0
                Code::InvalidArgument,
473
0
                "No addresses were specified in redis store configuration."
474
0
            ));
475
15
        }
476
477
15
        if spec.broadcast_channel_capacity != 0 {
478
1
            warn!("broadcast_channel_capacity in Redis spec is deprecated and ignored");
479
14
        }
480
15
        if spec.response_timeout_s != 0 {
481
0
            warn!(
482
                "response_timeout_s in Redis spec is deprecated and ignored, use command_timeout_ms"
483
            );
484
15
        }
485
15
        if spec.connection_timeout_s != 0 {
486
0
            if spec.connection_timeout_ms != 0 {
487
0
                return Err(make_err!(
488
0
                    Code::InvalidArgument,
489
0
                    "Both connection_timeout_s and connection_timeout_ms were set, can only have one!"
490
0
                ));
491
0
            }
492
0
            warn!("connection_timeout_s in Redis spec is deprecated, use connection_timeout_ms");
493
0
            spec.connection_timeout_ms = spec.connection_timeout_s * 1000;
494
15
        }
495
15
        if spec.connection_timeout_ms == 0 {
496
10
            spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS;
497
10
        
}5
498
15
        if spec.command_timeout_ms == 0 {
499
13
            spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS;
500
13
        
}2
501
15
        if spec.connection_pool_size == 0 {
502
15
            spec.connection_pool_size = DEFAULT_CONNECTION_POOL_SIZE;
503
15
        
}0
504
15
        if spec.read_chunk_size == 0 {
505
15
            spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
506
15
        
}0
507
15
        if spec.max_count_per_cursor == 0 {
508
15
            spec.max_count_per_cursor = DEFAULT_MAX_COUNT_PER_CURSOR;
509
15
        
}0
510
15
        if spec.max_chunk_uploads_per_update == 0 {
511
15
            spec.max_chunk_uploads_per_update = DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE;
512
15
        
}0
513
15
        if spec.scan_count == 0 {
514
15
            spec.scan_count = DEFAULT_SCAN_COUNT;
515
15
        
}0
516
15
        if spec.max_client_permits == 0 {
517
15
            spec.max_client_permits = DEFAULT_CLIENT_PERMITS;
518
15
        
}0
519
15
        if spec.retry.delay == 0.0 {
520
15
            spec.retry.delay = DEFAULT_RETRY_DELAY;
521
15
        
}0
522
15
        if spec.retry.max_retries == 0 {
523
15
            spec.retry.max_retries = 1;
524
15
        
}0
525
15
        trace!(?spec, "redis spec is after setting defaults");
526
15
        Ok(())
527
15
    }
528
529
    // Only used by tests, because we need to make a real redis connection, then fix this to get fixed values
530
3
    pub fn replace_temp_name_generator(&mut self, replacement: fn() -> String) {
531
3
        self.temp_name_generator_fn = replacement;
532
3
    }
533
}
534
535
impl RedisStore<ClusterConnection, ClusterRedisManager<ClusterConnection>> {
536
0
    pub async fn new_cluster(mut spec: RedisSpec) -> Result<Arc<Self>, Error> {
537
0
        if spec.mode != RedisMode::Cluster {
538
0
            return Err(Error::new(
539
0
                Code::InvalidArgument,
540
0
                "new_cluster only works for Cluster mode".to_string(),
541
0
            ));
542
0
        }
543
0
        Self::set_spec_defaults(&mut spec)?;
544
545
0
        let parsed_addrs: Vec<_> = spec
546
0
            .addresses
547
0
            .iter_mut()
548
0
            .map(|addr| {
549
0
                addr.clone().into_connection_info().map(|connection_info| {
550
0
                    let redis_settings = connection_info
551
0
                        .redis_settings()
552
0
                        .clone()
553
                        // We need RESP3 here because the cluster mode doesn't support RESP2 pubsub
554
                        // See also https://docs.rs/redis/latest/redis/cluster_async/index.html#pubsub
555
0
                        .set_protocol(redis::ProtocolVersion::RESP3);
556
0
                    connection_info.set_redis_settings(redis_settings)
557
0
                })
558
0
            })
559
0
            .collect::<Result<Vec<_>, _>>()?;
560
561
0
        let connection_timeout = Duration::from_millis(spec.connection_timeout_ms);
562
0
        let command_timeout = Duration::from_millis(spec.command_timeout_ms);
563
0
        let (tx, subscriber_channel) = unbounded_channel();
564
565
0
        let builder = ClusterClient::builder(parsed_addrs)
566
0
            .connection_timeout(connection_timeout)
567
0
            .response_timeout(command_timeout)
568
0
            .push_sender(tx)
569
0
            .retries(u32::try_from(spec.retry.max_retries)?);
570
571
0
        let client = builder.build()?;
572
573
0
        Self::new_from_builder_and_parts(
574
0
            spec.experimental_pub_sub_channel,
575
0
            || Uuid::new_v4().to_string(),
576
0
            spec.key_prefix.clone(),
577
0
            spec.read_chunk_size,
578
0
            spec.max_chunk_uploads_per_update,
579
0
            spec.scan_count,
580
0
            spec.max_client_permits,
581
0
            spec.max_count_per_cursor,
582
0
            subscriber_channel,
583
0
            ClusterRedisManager::new(client.get_async_connection().await?).await?,
584
        )
585
0
        .await
586
0
        .map(Arc::new)
587
0
    }
588
}
589
590
impl RedisStore<ConnectionManager, StandardRedisManager<ConnectionManager>> {
591
17
    async fn connect(
592
17
        spec: RedisSpec,
593
17
        tx: UnboundedSender<PushInfo>,
594
17
    ) -> Result<ConnectionManager, Error> {
595
17
        let connection_timeout = Duration::from_millis(spec.connection_timeout_ms);
596
17
        let command_timeout = Duration::from_millis(spec.command_timeout_ms);
597
598
17
        let addr = &spec.addresses[0];
599
17
        let local_addr = addr.clone();
600
17
        let mut parsed_addr = local_addr
601
17
            .replace("redis+sentinel://", "redis://")
602
17
            .into_connection_info()
?0
;
603
604
17
        let redis_settings = parsed_addr
605
17
            .redis_settings()
606
17
            .clone()
607
            // We need RESP3 here because we want to do set_push_sender
608
17
            .set_protocol(redis::ProtocolVersion::RESP3);
609
17
        parsed_addr = parsed_addr.set_redis_settings(redis_settings);
610
17
        debug!(?parsed_addr, "Parsed redis addr");
611
612
17
        let 
client16
= timeout(
613
17
            connection_timeout,
614
17
            spawn!("connect", async move {
615
17
                match spec.mode {
616
8
                    RedisMode::Standard => Client::open(parsed_addr).map_err(Into::<Error>::into),
617
                    RedisMode::Cluster => {
618
0
                        return Err(Error::new(
619
0
                            Code::Internal,
620
0
                            "Use RedisStore::new_cluster for cluster connections".to_owned(),
621
0
                        ));
622
                    }
623
9
                    RedisMode::Sentinel => async {
624
9
                        let url_parsing = Url::parse(&local_addr)
?0
;
625
9
                        let master_name = url_parsing
626
9
                            .query_pairs()
627
9
                            .find(|(key, _)| 
key1
==
"sentinelServiceName"1
)
628
9
                            .map_or_else(|| 
"master"8
.
into8
(), |(_, value)|
value1
.
to_string1
());
629
630
9
                        let redis_connection_info = parsed_addr.redis_settings().clone();
631
9
                        let sentinel_connection_info = SentinelNodeConnectionInfo::default()
632
9
                            .set_redis_connection_info(redis_connection_info);
633
634
                        // We fish this out because sentinels don't support db, we need to set it
635
                        // on the client only. See also https://github.com/redis-rs/redis-rs/issues/1950
636
9
                        let original_db = parsed_addr.redis_settings().db();
637
9
                        if original_db != 0 {
638
1
                            // sentinel_connection_info has the actual DB set
639
1
                            let revised_settings = parsed_addr.redis_settings().clone().set_db(0);
640
1
                            parsed_addr = parsed_addr.set_redis_settings(revised_settings);
641
8
                        }
642
643
9
                        SentinelClient::build(
644
9
                            vec![parsed_addr],
645
9
                            master_name,
646
9
                            Some(sentinel_connection_info),
647
9
                            SentinelServerType::Master,
648
                        )
649
9
                        .map_err(Into::<Error>::into)
650
9
                    }
651
18
                    .
and_then9
(|mut s| async move
{9
Ok(
s9
.async_get_client().await) })
652
9
                    .await
?0
653
9
                    .map_err(Into::<Error>::into),
654
                }
655
17
                .err_tip_with_code(|_e| 
{1
656
1
                    (
657
1
                        Code::InvalidArgument,
658
1
                        format!("While connecting to redis with url: {local_addr}"),
659
1
                    )
660
1
                })
661
17
            }),
662
        )
663
17
        .await
664
17
        .err_tip(|| 
format!0
("Timeout while connecting to redis with url: {addr}"))
?0
?0
?1
;
665
666
16
        let connection_manager_config = {
667
16
            ConnectionManagerConfig::new()
668
16
                .set_number_of_retries(spec.retry.max_retries)
669
16
                .set_connection_timeout(Some(connection_timeout))
670
16
                .set_response_timeout(Some(command_timeout))
671
16
                .set_push_sender(tx)
672
        };
673
674
14
        let mut connection_manager =
675
16
            ConnectionManager::new_with_config(client, connection_manager_config)
676
16
                .await
677
16
                .err_tip(|| 
format!2
("While connecting to redis with url: {addr}"))
?2
;
678
679
14
        if let Some(
pub_sub_channel3
) = spec.experimental_pub_sub_channel {
680
3
            connection_manager.psubscribe(pub_sub_channel).await
?0
;
681
11
        }
682
683
14
        Ok(connection_manager)
684
17
    }
685
686
    /// Create a new `RedisStore` from the given configuration.
687
15
    pub async fn new_standard(mut spec: RedisSpec) -> Result<Arc<Self>, Error> {
688
15
        Self::set_spec_defaults(&mut spec)
?0
;
689
690
15
        if spec.addresses.len() != 1 {
691
0
            return Err(make_err!(
692
0
                Code::Unimplemented,
693
0
                "Connecting directly to multiple redis nodes in a cluster is currently unsupported. Please specify a single URL to a single node, and nativelink will use cluster discover to find the other nodes."
694
0
            ));
695
15
        }
696
697
15
        let (tx, subscriber_channel) = unbounded_channel();
698
699
12
        Self::new_from_builder_and_parts(
700
15
            spec.experimental_pub_sub_channel.clone(),
701
1
            || Uuid::new_v4().to_string(),
702
15
            spec.key_prefix.clone(),
703
15
            spec.read_chunk_size,
704
15
            spec.max_chunk_uploads_per_update,
705
15
            spec.scan_count,
706
15
            spec.max_client_permits,
707
15
            spec.max_count_per_cursor,
708
15
            subscriber_channel,
709
17
            
StandardRedisManager::new15
(
Box::new15
(move || {
710
17
                Box::pin(Self::connect(spec.clone(), tx.clone()))
711
17
            }))
712
15
            .await
?3
,
713
        )
714
12
        .await
715
12
        .map(Arc::new)
716
15
    }
717
}
718
719
#[async_trait]
720
impl<C, M> StoreDriver for RedisStore<C, M>
721
where
722
    C: ConnectionLike + Clone + Send + Sync + Unpin + 'static,
723
    M: RedisManager<C> + Unpin + Send + Sync + 'static,
724
{
725
    async fn has_with_results(
726
        self: Pin<&Self>,
727
        keys: &[StoreKey<'_>],
728
        results: &mut [Option<u64>],
729
6
    ) -> Result<(), Error> {
730
        // TODO(palfrey) We could use pipeline here, but it makes retry more
731
        // difficult and it doesn't work very well in cluster mode.
732
        // If we wanted to optimize this with pipeline be careful to
733
        // implement retry and to support cluster mode.
734
735
        izip!(keys.iter(), results.iter_mut(),)
736
6
            .map(|(key, result)| async move {
737
                // We need to do a special pass to ensure our zero key exist.
738
6
                if is_zero_digest(key.borrow()) {
739
2
                    *result = Some(0);
740
2
                    return Ok::<_, Error>(());
741
4
                }
742
4
                let encoded_key = self.encode_key(key);
743
744
4
                let mut client = self.get_client().await
?0
;
745
746
                // Redis returns 0 when the key doesn't exist
747
                // AND when the key exists with value of length 0.
748
                // Therefore, we need to check both length and existence
749
                // and do it in a pipeline for efficiency
750
4
                let (blob_len, exists) = pipe()
751
4
                    .strlen(encoded_key.as_ref())
752
4
                    .exists(encoded_key.as_ref())
753
4
                    .query_async::<(u64, bool)>(&mut client.connection_manager)
754
4
                    .await
755
4
                    .err_tip(|| "In RedisStore::has_with_results::all")
?0
;
756
757
4
                *result = if exists { Some(blob_len) } else { 
None0
};
758
759
4
                Ok::<_, Error>(())
760
12
            })
761
            .collect::<FuturesUnordered<_>>()
762
            .try_collect()
763
            .await
764
6
    }
765
766
    async fn list(
767
        self: Pin<&Self>,
768
        range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
769
        handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
770
8
    ) -> Result<u64, Error> {
771
        let range = (
772
            range.0.map(StoreKey::into_owned),
773
            range.1.map(StoreKey::into_owned),
774
        );
775
        let pattern = match range.0 {
776
            Bound::Included(ref start) | Bound::Excluded(ref start) => match range.1 {
777
                Bound::Included(ref end) | Bound::Excluded(ref end) => {
778
                    let start = start.as_str();
779
                    let end = end.as_str();
780
                    let max_length = start.len().min(end.len());
781
                    let length = start
782
                        .chars()
783
                        .zip(end.chars())
784
20
                        .position(|(a, b)| a != b)
785
                        .unwrap_or(max_length);
786
                    format!("{}{}*", self.key_prefix, &start[..length])
787
                }
788
                Bound::Unbounded => format!("{}*", self.key_prefix),
789
            },
790
            Bound::Unbounded => format!("{}*", self.key_prefix),
791
        };
792
        let mut client = self.get_client().await?;
793
        trace!(%pattern, count=self.scan_count, "Running SCAN");
794
        let opts = ScanOptions::default()
795
            .with_pattern(pattern)
796
            .with_count(self.scan_count);
797
        let mut scan_stream: AsyncIter<Value> = client
798
            .connection_manager
799
            .scan_options(opts)
800
            .await
801
            .err_tip(|| "During scan_options")?;
802
        let mut iterations = 0;
803
        let mut errors = vec![];
804
        while let Some(key) = scan_stream.next_item().await {
805
            if let Ok(Value::BulkString(raw_key)) = key {
806
                let Ok(str_key) = str::from_utf8(&raw_key) else {
807
                    error!(?raw_key, "Non-utf8 key");
808
                    errors.push(format!("Non-utf8 key {raw_key:?}"));
809
                    continue;
810
                };
811
                if let Some(key) = str_key.strip_prefix(&self.key_prefix) {
812
                    let key = StoreKey::new_str(key);
813
                    if range.contains(&key) {
814
                        iterations += 1;
815
                        if !handler(&key) {
816
                            error!("Issue in handler");
817
                            errors.push("Issue in handler".to_string());
818
                        }
819
                    } else {
820
                        trace!(%key, ?range, "Key not in range");
821
                    }
822
                } else {
823
                    errors.push("Key doesn't match prefix".to_string());
824
                }
825
            } else {
826
                error!(?key, "Non-string in key");
827
                errors.push("Non-string in key".to_string());
828
            }
829
        }
830
        if errors.is_empty() {
831
            Ok(iterations)
832
        } else {
833
            error!(?errors, "Errors in scan stream");
834
            Err(Error::new(Code::Internal, format!("Errors: {errors:?}")))
835
        }
836
8
    }
837
838
    async fn update(
839
        self: Pin<&Self>,
840
        key: StoreKey<'_>,
841
        mut reader: DropCloserReadHalf,
842
        _upload_size: UploadSizeInfo,
843
9
    ) -> Result<(), Error> {
844
        let final_key = self.encode_key(&key);
845
846
        // While the name generation function can be supplied by the user, we need to have the curly
847
        // braces in place in order to manage redis' hashing behavior and make sure that the temporary
848
        // key name and the final key name are directed to the same cluster node. See
849
        // https://redis.io/blog/redis-clustering-best-practices-with-keys/
850
        //
851
        // The TL;DR is that if we're in cluster mode and the names hash differently, we can't use request
852
        // pipelining. By using these braces, we tell redis to only hash the part of the temporary key that's
853
        // identical to the final key -- so they will always hash to the same node.
854
        let temp_key = format!(
855
            "temp-{}-{{{}}}",
856
            (self.temp_name_generator_fn)(),
857
            &final_key
858
        );
859
860
        if is_zero_digest(key.borrow()) {
861
            let chunk = reader
862
                .peek()
863
                .await
864
                .err_tip(|| "Failed to peek in RedisStore::update")?;
865
            if chunk.is_empty() {
866
                reader
867
                    .drain()
868
                    .await
869
                    .err_tip(|| "Failed to drain in RedisStore::update")?;
870
                // Zero-digest keys are special -- we don't need to do anything with it.
871
                return Ok(());
872
            }
873
        }
874
875
        let mut client = self.get_client().await?;
876
877
        let mut read_stream = reader
878
8
            .scan(0u32, |bytes_read, chunk_res| {
879
8
                future::ready(Some(
880
8
                    chunk_res
881
8
                        .err_tip(|| "Failed to read chunk in update in redis store")
882
8
                        .and_then(|chunk| 
{7
883
7
                            let offset = isize::try_from(*bytes_read).err_tip(|| "Could not convert offset to isize in RedisStore::update")
?0
;
884
7
                            let chunk_len = u32::try_from(chunk.len()).err_tip(
885
                                || "Could not convert chunk length to u32 in RedisStore::update",
886
0
                            )?;
887
7
                            let new_bytes_read = bytes_read
888
7
                                .checked_add(chunk_len)
889
7
                                .err_tip(|| "Overflow protection in RedisStore::update")
?0
;
890
7
                            *bytes_read = new_bytes_read;
891
7
                            Ok::<_, Error>((offset, *bytes_read, chunk))
892
7
                        }),
893
                ))
894
8
            })
895
8
            .map(|res| {
896
8
                let (
offset7
,
end_pos7
,
chunk7
) = res
?1
;
897
7
                let temp_key_ref = &temp_key;
898
7
                Ok(async move {
899
7
                    let (mut connection_manager, connect_id) = self.connection_manager.get_connection().await
?0
;
900
7
                    match connection_manager
901
7
                        .setrange::<_, _, usize>(temp_key_ref, offset, chunk.to_vec())
902
7
                        .await {
903
5
                        Ok(_) => {},
904
1
                        Err(err)
905
2
                            if err.kind() == redis::ErrorKind::Server(redis::ServerErrorKind::ReadOnly
)1
=>
906
                        {
907
1
                            let (mut connection_manager, _connect_id) = self.connection_manager.reconnect(connect_id).await
?0
;
908
1
                            connection_manager
909
1
                                .setrange::<_, _, usize>(temp_key_ref, offset, chunk.to_vec())
910
1
                                .await
911
1
                                .err_tip(
912
0
                                    || format!("(after reconnect) while appending to temp key ({temp_key_ref}) in RedisStore::update. offset = {offset}. end_pos = {end_pos}"),
913
0
                                )?;
914
                        }
915
1
                        Err(err) => {
916
1
                            let mut error: Error = err.into();
917
1
                            error
918
1
                                .messages
919
1
                                .push(format!("While appending to temp key ({temp_key_ref}) in RedisStore::update. offset = {offset}. end_pos = {end_pos}"));
920
1
                            return Err(error);
921
                        }
922
                    }
923
6
                    Ok::<u32, Error>(end_pos)
924
7
                })
925
8
            })
926
            .try_buffer_unordered(self.max_chunk_uploads_per_update);
927
928
        let mut total_len: u32 = 0;
929
        while let Some(last_pos) = read_stream.try_next().await? {
930
            if last_pos > total_len {
931
                total_len = last_pos;
932
            }
933
        }
934
935
        let blob_len: usize = client
936
            .connection_manager
937
            .strlen(&temp_key)
938
            .await
939
0
            .err_tip(|| format!("In RedisStore::update strlen check for {temp_key}"))?;
940
        // This is a safety check to ensure that in the event some kind of retry was to happen
941
        // and the data was appended to the key twice, we reject the data.
942
        if blob_len != usize::try_from(total_len).unwrap_or(usize::MAX) {
943
            return Err(make_input_err!(
944
                "Data length mismatch in RedisStore::update for {}({}) - expected {} bytes, got {} bytes",
945
                key.borrow().as_str(),
946
                temp_key,
947
                total_len,
948
                blob_len,
949
            ));
950
        }
951
952
        // Rename the temp key so that the data appears under the real key. Any data already present in the real key is lost.
953
        client
954
            .connection_manager
955
            .rename::<_, _, ()>(&temp_key, final_key.as_ref())
956
            .await
957
            .err_tip(|| "While queueing key rename in RedisStore::update()")?;
958
959
        // If we have a publish channel configured, send a notice that the key has been set.
960
        if let Some(pub_sub_channel) = &self.pub_sub_channel {
961
            return Ok(client
962
                .connection_manager
963
                .publish(pub_sub_channel, final_key.as_ref())
964
                .await?);
965
        }
966
967
        Ok(())
968
9
    }
969
970
    async fn get_part(
971
        self: Pin<&Self>,
972
        key: StoreKey<'_>,
973
        writer: &mut DropCloserWriteHalf,
974
        offset: u64,
975
        length: Option<u64>,
976
5
    ) -> Result<(), Error> {
977
        let offset = isize::try_from(offset).err_tip(|| "Could not convert offset to isize")?;
978
        let length = length
979
4
            .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize"))
980
            .transpose()?;
981
982
        // To follow RBE spec we need to consider any digest's with
983
        // zero size to be existing.
984
        if is_zero_digest(key.borrow()) {
985
            return writer
986
                .send_eof()
987
                .err_tip(|| "Failed to send zero EOF in redis store get_part");
988
        }
989
990
        let encoded_key = self.encode_key(&key);
991
        let encoded_key = encoded_key.as_ref();
992
993
        // N.B. the `-1`'s you see here are because redis GETRANGE is inclusive at both the start and end, so when we
994
        // do math with indices we change them to be exclusive at the end.
995
996
        // We want to read the data at the key from `offset` to `offset + length`.
997
        let data_start = offset;
998
        let data_end = data_start
999
            .saturating_add(length.unwrap_or(isize::MAX as usize) as isize)
1000
            .saturating_sub(1);
1001
1002
        // And we don't ever want to read more than `read_chunk_size` bytes at a time, so we'll need to iterate.
1003
        let mut chunk_start = data_start;
1004
        let mut chunk_end = cmp::min(
1005
            data_start.saturating_add(self.read_chunk_size as isize) - 1,
1006
            data_end,
1007
        );
1008
1009
        let mut client = self.get_client().await?;
1010
        loop {
1011
            let chunk: Bytes = client
1012
                .connection_manager
1013
                .getrange(encoded_key, chunk_start, chunk_end)
1014
                .await
1015
                .err_tip(|| "In RedisStore::get_part::getrange")?;
1016
1017
            let didnt_receive_full_chunk = chunk.len() < self.read_chunk_size;
1018
            let reached_end_of_data = chunk_end == data_end;
1019
1020
            if didnt_receive_full_chunk || reached_end_of_data {
1021
                if !chunk.is_empty() {
1022
                    writer
1023
                        .send(chunk)
1024
                        .await
1025
                        .err_tip(|| "Failed to write data in RedisStore::get_part")?;
1026
                }
1027
1028
                break; // No more data to read.
1029
            }
1030
1031
            // We received a full chunk's worth of data, so write it...
1032
            writer
1033
                .send(chunk)
1034
                .await
1035
                .err_tip(|| "Failed to write data in RedisStore::get_part")?;
1036
1037
            // ...and go grab the next chunk.
1038
            chunk_start = chunk_end + 1;
1039
            chunk_end = cmp::min(
1040
                chunk_start.saturating_add(self.read_chunk_size as isize) - 1,
1041
                data_end,
1042
            );
1043
        }
1044
1045
        // If we didn't write any data, check if the key exists, if not return a NotFound error.
1046
        // This is required by spec.
1047
        if writer.get_bytes_written() == 0 {
1048
            // We're supposed to read 0 bytes, so just check if the key exists.
1049
            let exists: bool = client
1050
                .connection_manager
1051
                .exists(encoded_key)
1052
                .await
1053
                .err_tip(|| "In RedisStore::get_part::zero_exists")?;
1054
1055
            if !exists {
1056
                return Err(make_err!(
1057
                    Code::NotFound,
1058
                    "Data not found in Redis store for digest: {key:?}"
1059
                ));
1060
            }
1061
        }
1062
1063
        writer
1064
            .send_eof()
1065
            .err_tip(|| "Failed to write EOF in redis store get_part")
1066
5
    }
1067
1068
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
1069
0
        self
1070
0
    }
1071
1072
0
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send) {
1073
0
        self
1074
0
    }
1075
1076
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send> {
1077
0
        self
1078
0
    }
1079
1080
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
1081
0
        registry.register_indicator(self);
1082
0
    }
1083
1084
0
    fn register_remove_callback(
1085
0
        self: Arc<Self>,
1086
0
        _callback: Arc<dyn RemoveItemCallback>,
1087
0
    ) -> Result<(), Error> {
1088
        // As redis doesn't drop stuff, we can just ignore this
1089
0
        Ok(())
1090
0
    }
1091
}
1092
1093
#[async_trait]
1094
impl<C, M> HealthStatusIndicator for RedisStore<C, M>
1095
where
1096
    C: ConnectionLike + Clone + Send + Sync + Unpin + 'static,
1097
    M: RedisManager<C> + Send + Sync + Unpin + 'static,
1098
{
1099
1
    fn get_name(&self) -> &'static str {
1100
1
        "RedisStore"
1101
1
    }
1102
1103
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
1104
        StoreDriver::check_health(Pin::new(self), namespace).await
1105
0
    }
1106
}
1107
1108
// -------------------------------------------------------------------
1109
// Below this line are specific to the redis scheduler implementation.
1110
// -------------------------------------------------------------------
1111
1112
/// The time in milliseconds that a redis cursor can be idle before it is closed.
1113
const CURSOR_IDLE_MS: u64 = 30_000;
1114
/// The name of the field in the Redis hash that stores the data.
1115
const DATA_FIELD_NAME: &str = "data";
1116
/// The name of the field in the Redis hash that stores the version.
1117
const VERSION_FIELD_NAME: &str = "version";
1118
/// The time to live of indexes in seconds. After this time redis may delete the index.
1119
const INDEX_TTL_S: u64 = 60 * 60 * 24; // 24 hours.
1120
1121
#[allow(rustdoc::broken_intra_doc_links)]
1122
/// Lua script to set a key if the version matches.
1123
/// Args:
1124
///   KEYS[1]: The key where the version is stored.
1125
///   ARGV[1]: The expected version.
1126
///   ARGV[2]: The new data.
1127
///   ARGV[3*]: Key-value pairs of additional data to include.
1128
/// Returns:
1129
///   The new version if the version matches. nil is returned if the
1130
///   value was not set.
1131
pub const LUA_VERSION_SET_SCRIPT: &str = formatcp!(
1132
    r"
1133
local key = KEYS[1]
1134
local expected_version = tonumber(ARGV[1])
1135
local new_data = ARGV[2]
1136
local new_version = redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', 1)
1137
local i
1138
local indexes = {{}}
1139
1140
if new_version-1 ~= expected_version then
1141
    redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', -1)
1142
    return {{ 0, new_version-1 }}
1143
end
1144
-- Skip first 2 argvs, as they are known inputs.
1145
-- Remember: Lua is 1-indexed.
1146
for i=3, #ARGV do
1147
    indexes[i-2] = ARGV[i]
1148
end
1149
1150
-- In testing we witnessed redis sometimes not update our FT indexes
1151
-- resulting in stale data. It appears if we delete our keys then insert
1152
-- them again it works and reduces risk significantly.
1153
redis.call('DEL', key)
1154
redis.call('HSET', key, '{DATA_FIELD_NAME}', new_data, '{VERSION_FIELD_NAME}', new_version, unpack(indexes))
1155
1156
return {{ 1, new_version }}
1157
"
1158
);
1159
1160
/// This is the output of the calculations below hardcoded into the executable.
1161
const FINGERPRINT_CREATE_INDEX_HEX: &str = "3e762c15";
1162
1163
#[cfg(test)]
1164
mod test {
1165
    use super::FINGERPRINT_CREATE_INDEX_HEX;
1166
1167
    /// String of the `FT.CREATE` command used to create the index template.
1168
    const CREATE_INDEX_TEMPLATE: &str = "FT.CREATE {} ON HASH PREFIX 1 {} NOOFFSETS NOHL NOFIELDS NOFREQS SCHEMA {} TAG CASESENSITIVE SORTABLE";
1169
1170
    /// Compile-time fingerprint of the `FT.CREATE` command used to create the
1171
    /// index template. This is a simple CRC32 checksum of the command string.
1172
    /// We don't care about it actually being a valid CRC32 checksum, just that
1173
    /// it's a unique identifier with a low chance of collision.
1174
1
    const fn fingerprint_create_index_template() -> u32 {
1175
        const POLY: u32 = 0xEDB8_8320;
1176
        const DATA: &[u8] = CREATE_INDEX_TEMPLATE.as_bytes();
1177
1
        let mut crc = 0xFFFF_FFFF;
1178
1
        let mut i = 0;
1179
102
        while i < DATA.len() {
1180
101
            let byte = DATA[i];
1181
101
            crc ^= byte as u32;
1182
1183
101
            let mut j = 0;
1184
909
            while j < 8 {
1185
808
                crc = if crc & 1 != 0 {
1186
386
                    (crc >> 1) ^ POLY
1187
                } else {
1188
422
                    crc >> 1
1189
                };
1190
808
                j += 1;
1191
            }
1192
101
            i += 1;
1193
        }
1194
1
        crc
1195
1
    }
1196
1197
    /// Verify that our calculation always evaluates to this fixed value.
1198
    #[test]
1199
1
    fn test_fingerprint_value() {
1200
1
        assert_eq!(
1201
1
            format!("{:08x}", &fingerprint_create_index_template()),
1202
            FINGERPRINT_CREATE_INDEX_HEX,
1203
        );
1204
1
    }
1205
}
1206
1207
/// Get the name of the index to create for the given field.
1208
/// This will add some prefix data to the name to try and ensure
1209
/// if the index definition changes, the name will get a new name.
1210
macro_rules! get_index_name {
1211
    ($prefix:expr, $field:expr, $maybe_sort:expr) => {
1212
        format_args!(
1213
            "{}_{}_{}_{}",
1214
            $prefix,
1215
            $field,
1216
            $maybe_sort.unwrap_or(""),
1217
            FINGERPRINT_CREATE_INDEX_HEX
1218
        )
1219
    };
1220
}
1221
1222
/// Try to sanitize a string to be used as a Redis key.
1223
/// We don't actually modify the string, just check if it's valid.
1224
24
const fn try_sanitize(s: &str) -> bool {
1225
    // Note: We cannot use for loops or iterators here because they are not const.
1226
    // Allowing us to use a const function here gives the compiler the ability to
1227
    // optimize this function away entirely in the case where the input is constant.
1228
24
    let chars = s.as_bytes();
1229
24
    let mut i: usize = 0;
1230
24
    let len = s.len();
1231
    loop {
1232
423
        if i >= len {
1233
24
            break;
1234
399
        }
1235
399
        let c = chars[i];
1236
399
        if !c.is_ascii_alphanumeric() && 
c != b'_'15
{
1237
0
            return false;
1238
399
        }
1239
399
        i += 1;
1240
    }
1241
24
    true
1242
24
}
1243
1244
/// An individual subscription to a key in Redis.
1245
#[derive(Debug)]
1246
pub struct RedisSubscription {
1247
    receiver: Option<tokio::sync::watch::Receiver<String>>,
1248
    weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
1249
}
1250
1251
impl SchedulerSubscription for RedisSubscription {
1252
    /// Wait for the subscription key to change.
1253
18
    async fn changed(&mut self) -> Result<(), Error> {
1254
18
        let receiver = self
1255
18
            .receiver
1256
18
            .as_mut()
1257
18
            .ok_or_else(|| 
make_err!0
(
Code::Internal0
, "In RedisSubscription::changed::as_mut"))
?0
;
1258
18
        receiver.changed().await.
map_err15
(|err|
{0
1259
0
            Error::from_std_err(Code::Internal, &err)
1260
0
                .append("In RedisSubscription::changed::changed")
1261
0
        })
1262
15
    }
1263
}
1264
1265
// If the subscription is dropped, we need to possibly remove the key from the
1266
// subscribed keys map.
1267
impl Drop for RedisSubscription {
1268
4
    fn drop(&mut self) {
1269
4
        let Some(receiver) = self.receiver.take() else {
1270
0
            warn!("RedisSubscription has already been dropped, nothing to do.");
1271
0
            return; // Already dropped, nothing to do.
1272
        };
1273
4
        let key = receiver.borrow().clone();
1274
        // IMPORTANT: This must be dropped before receiver_count() is called.
1275
4
        drop(receiver);
1276
4
        let Some(
subscribed_keys2
) = self.weak_subscribed_keys.upgrade() else {
1277
2
            return; // Already dropped, nothing to do.
1278
        };
1279
2
        let mut subscribed_keys = subscribed_keys.write();
1280
2
        let Some(value) = subscribed_keys.get(&key) else {
1281
0
            error!(
1282
                "Key {key} was not found in subscribed keys when checking if it should be removed."
1283
            );
1284
0
            return;
1285
        };
1286
        // If we have no receivers, cleanup the entry from our map.
1287
2
        if value.receiver_count() == 0 {
1288
2
            subscribed_keys.remove(key);
1289
2
        
}0
1290
4
    }
1291
}
1292
1293
/// A publisher for a key in Redis.
1294
#[derive(Debug)]
1295
struct RedisSubscriptionPublisher {
1296
    sender: Mutex<tokio::sync::watch::Sender<String>>,
1297
}
1298
1299
impl RedisSubscriptionPublisher {
1300
4
    fn new(
1301
4
        key: String,
1302
4
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
1303
4
    ) -> (Self, RedisSubscription) {
1304
4
        let (sender, receiver) = tokio::sync::watch::channel(key);
1305
4
        let publisher = Self {
1306
4
            sender: Mutex::new(sender),
1307
4
        };
1308
4
        let subscription = RedisSubscription {
1309
4
            receiver: Some(receiver),
1310
4
            weak_subscribed_keys,
1311
4
        };
1312
4
        (publisher, subscription)
1313
4
    }
1314
1315
0
    fn subscribe(
1316
0
        &self,
1317
0
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
1318
0
    ) -> RedisSubscription {
1319
0
        let receiver = self.sender.lock().subscribe();
1320
0
        RedisSubscription {
1321
0
            receiver: Some(receiver),
1322
0
            weak_subscribed_keys,
1323
0
        }
1324
0
    }
1325
1326
2
    fn receiver_count(&self) -> usize {
1327
2
        self.sender.lock().receiver_count()
1328
2
    }
1329
1330
18
    fn notify(&self) {
1331
        // TODO(https://github.com/sile/patricia_tree/issues/40) When this is addressed
1332
        // we can remove the `Mutex` and use the mutable iterator directly.
1333
18
        self.sender.lock().send_modify(|_| {});
1334
18
    }
1335
}
1336
1337
#[derive(Debug, Clone)]
1338
pub struct RedisSubscriptionManager {
1339
    subscribed_keys: Arc<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
1340
    tx_for_test: UnboundedSender<String>,
1341
    _subscription_spawn: Arc<Mutex<JoinHandleDropGuard<()>>>,
1342
}
1343
1344
impl RedisSubscriptionManager {
1345
5
    pub fn new(subscriber_channel: UnboundedReceiver<PushInfo>) -> Self {
1346
5
        let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new()));
1347
5
        let subscribed_keys_weak = Arc::downgrade(&subscribed_keys);
1348
5
        let (tx_for_test, mut rx_for_test) = unbounded_channel();
1349
5
        let mut local_subscriber_channel = UnboundedReceiverStream::new(subscriber_channel);
1350
        Self {
1351
5
            subscribed_keys,
1352
5
            tx_for_test,
1353
5
            _subscription_spawn: Arc::new(Mutex::new(spawn!(
1354
                "redis_subscribe_spawn",
1355
5
                async move {
1356
                    loop {
1357
                        loop {
1358
21
                            let 
key15
= select! {
1359
21
                                
value14
= rx_for_test.recv() => {
1360
14
                                    let Some(value) = value else {
1361
0
                                        unreachable!("Channel should never close");
1362
                                    };
1363
14
                                    value
1364
                                },
1365
21
                                
maybe_push_info2
= local_subscriber_channel.next() => {
1366
2
                                    if let Some(push_info) = maybe_push_info {
1367
2
                                        match push_info.kind {
1368
1
                                            redis::PushKind::PMessage => {},
1369
                                            redis::PushKind::PSubscribe => {
1370
1
                                                trace!(?push_info, "PSubscribe, ignore");
1371
1
                                                continue;
1372
                                            }
1373
                                            _ => {
1374
0
                                                warn!(?push_info, "Other push_info message, discarded");
1375
0
                                                continue;
1376
                                            },
1377
                                        }
1378
1
                                        if push_info.data.len() != 3 {
1379
0
                                            error!(?push_info, "Expected exactly 3 values on subscriber channel (pattern, channel, value)");
1380
0
                                            continue;
1381
1
                                        }
1382
1
                                        match push_info.data.last().unwrap() {
1383
0
                                            Value::SimpleString(s) => {
1384
0
                                                s.clone()
1385
                                            }
1386
1
                                            Value::BulkString(v) => {
1387
1
                                                String::from_utf8(v.clone()).expect("String message")
1388
                                            }
1389
0
                                            other => {
1390
0
                                                error!(?other, "Received non-string message in RedisSubscriptionManager");
1391
0
                                                continue;
1392
                                            }
1393
                                        }
1394
                                    } else {
1395
0
                                        error!("Error receiving message in RedisSubscriptionManager from subscriber_channel");
1396
0
                                        break;
1397
                                    }
1398
                                }
1399
                            };
1400
15
                            trace!(key, "New subscription manager key");
1401
15
                            let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
1402
0
                                warn!(
1403
                                    "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"
1404
                                );
1405
0
                                return;
1406
                            };
1407
15
                            let subscribed_keys_mux = subscribed_keys.read();
1408
15
                            subscribed_keys_mux
1409
15
                                .common_prefix_values(&*key)
1410
15
                                .for_each(RedisSubscriptionPublisher::notify);
1411
                        }
1412
                        // Sleep for a small amount of time to ensure we don't reconnect too quickly.
1413
0
                        sleep(Duration::from_secs(1)).await;
1414
                        // If we reconnect or lag behind we might have had dirty keys, so we need to
1415
                        // flag all of them as changed.
1416
0
                        let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
1417
0
                            warn!(
1418
                                "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"
1419
                            );
1420
0
                            return;
1421
                        };
1422
0
                        let subscribed_keys_mux = subscribed_keys.read();
1423
                        // Just in case also get a new receiver.
1424
0
                        for publisher in subscribed_keys_mux.values() {
1425
0
                            publisher.notify();
1426
0
                        }
1427
                    }
1428
0
                }
1429
            ))),
1430
        }
1431
5
    }
1432
}
1433
1434
impl SubscriptionManagerNotify for RedisSubscriptionManager {
1435
14
    fn notify_for_test(&self, value: String) {
1436
14
        self.tx_for_test.send(value).unwrap();
1437
14
    }
1438
}
1439
1440
impl SchedulerSubscriptionManager for RedisSubscriptionManager {
1441
    type Subscription = RedisSubscription;
1442
1443
4
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
1444
4
    where
1445
4
        K: SchedulerStoreKeyProvider,
1446
    {
1447
4
        let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys);
1448
4
        let mut subscribed_keys = self.subscribed_keys.write();
1449
4
        let key = key.get_key();
1450
4
        let key_str = key.as_str();
1451
4
        let mut subscription = if let Some(
publisher0
) = subscribed_keys.get(&key_str) {
1452
0
            publisher.subscribe(weak_subscribed_keys)
1453
        } else {
1454
4
            let (publisher, subscription) =
1455
4
                RedisSubscriptionPublisher::new(key_str.to_string(), weak_subscribed_keys);
1456
4
            subscribed_keys.insert(key_str, publisher);
1457
4
            subscription
1458
        };
1459
4
        subscription
1460
4
            .receiver
1461
4
            .as_mut()
1462
4
            .ok_or_else(|| 
{0
1463
0
                make_err!(
1464
0
                    Code::Internal,
1465
                    "Receiver should be set in RedisSubscriptionManager::subscribe"
1466
                )
1467
0
            })?
1468
4
            .mark_changed();
1469
1470
4
        Ok(subscription)
1471
4
    }
1472
1473
1
    fn is_reliable() -> bool {
1474
1
        false
1475
1
    }
1476
}
1477
1478
impl<C, M> SchedulerStore for RedisStore<C, M>
1479
where
1480
    C: Clone + ConnectionLike + Sync + Send + 'static,
1481
    M: RedisManager<C> + Sync + Send + 'static,
1482
{
1483
    type SubscriptionManager = RedisSubscriptionManager;
1484
1485
6
    async fn subscription_manager(&self) -> Result<Arc<RedisSubscriptionManager>, Error> {
1486
6
        self.subscription_manager
1487
6
            .get_or_try_init(|| async move 
{3
1488
3
                let Some(subscriber_channel) = self.subscriber_channel.lock().take() else {
1489
0
                    return Err(make_input_err!(
1490
0
                        "Multiple attempts to obtain the subscription manager in RedisStore"
1491
0
                    ));
1492
                };
1493
3
                let Some(pub_sub_channel) = &self.pub_sub_channel else {
1494
0
                    return Err(make_input_err!(
1495
0
                        "RedisStore must have a pubsub for Redis Scheduler if using subscriptions"
1496
0
                    ));
1497
                };
1498
3
                self.connection_manager.psubscribe(pub_sub_channel).await
?0
;
1499
3
                Ok(Arc::new(RedisSubscriptionManager::new(subscriber_channel)))
1500
6
            })
1501
6
            .await
1502
6
            .map(Clone::clone)
1503
6
    }
1504
1505
21
    async fn update_data<T>(&self, data: T) -> Result<Option<i64>, Error>
1506
21
    where
1507
21
        T: SchedulerStoreDataProvider
1508
21
            + SchedulerStoreKeyProvider
1509
21
            + SchedulerCurrentVersionProvider
1510
21
            + Send,
1511
21
    {
1512
21
        let key = data.get_key();
1513
21
        let redis_key = self.encode_key(&key);
1514
21
        let mut client = self.get_client().await
?0
;
1515
21
        let maybe_index = data.get_indexes().err_tip(|| 
{0
1516
0
            format!("Err getting index in RedisStore::update_data::versioned for {redis_key}")
1517
0
        })?;
1518
21
        if <T as SchedulerStoreKeyProvider>::Versioned::VALUE {
1519
17
            let current_version = data.current_version();
1520
17
            let data = data.try_into_bytes().err_tip(|| 
{0
1521
0
                format!("Could not convert value to bytes in RedisStore::update_data::versioned for {redis_key}")
1522
0
            })?;
1523
17
            let mut script = self.connection_manager.update_script(redis_key.as_ref());
1524
17
            let mut script_invocation = script.arg(format!("{current_version}")).arg(data.to_vec());
1525
50
            for (name, value) in 
maybe_index17
{
1526
50
                script_invocation = script_invocation.arg(name).arg(value.to_vec());
1527
50
            }
1528
17
            let start = Instant::now();
1529
17
            let (success, new_version): (bool, i64) = match script_invocation
1530
17
                .invoke_async(&mut client.connection_manager)
1531
17
                .await
1532
            {
1533
17
                Ok(v) => v,
1534
0
                Err(err)
1535
0
                    if err.kind() == redis::ErrorKind::Server(redis::ServerErrorKind::ReadOnly) =>
1536
                {
1537
0
                    client.reconnect(&self.connection_manager).await?;
1538
0
                    script_invocation
1539
0
                        .invoke_async(&mut client.connection_manager)
1540
0
                        .await
1541
0
                        .err_tip(|| format!("(after reconnect) In RedisStore::update_data::versioned for {key:?}"))?
1542
                }
1543
0
                Err(err) => {
1544
0
                    let mut error: Error = err.into();
1545
0
                    error
1546
0
                        .messages
1547
0
                        .push(format!("In RedisStore::update_data::versioned for {key:?}"));
1548
0
                    return Err(error);
1549
                }
1550
            };
1551
1552
17
            let elapsed = start.elapsed();
1553
1554
17
            if elapsed > Duration::from_millis(100) {
1555
0
                warn!(
1556
                    %redis_key,
1557
                    ?elapsed,
1558
                    "Slow Redis version-set operation"
1559
                );
1560
17
            }
1561
17
            if !success {
1562
4
                warn!(
1563
                    %redis_key,
1564
                    %key,
1565
                    %current_version,
1566
                    %new_version,
1567
4
                    caller = core::any::type_name::<T>(),
1568
                    "Redis version conflict - optimistic lock failed"
1569
                );
1570
4
                return Ok(None);
1571
13
            }
1572
13
            trace!(
1573
                %redis_key,
1574
                %key,
1575
                old_version = %current_version,
1576
                %new_version,
1577
                "Updated redis key to new version"
1578
            );
1579
            // If we have a publish channel configured, send a notice that the key has been set.
1580
13
            if let Some(
pub_sub_channel12
) = &self.pub_sub_channel {
1581
12
                return Ok(client
1582
12
                    .connection_manager
1583
12
                    .publish(pub_sub_channel, redis_key.as_ref())
1584
12
                    .await
?0
);
1585
1
            }
1586
1
            Ok(Some(new_version))
1587
        } else {
1588
4
            let data = data.try_into_bytes().err_tip(|| 
{0
1589
0
                format!("Could not convert value to bytes in RedisStore::update_data::noversion for {redis_key}")
1590
0
            })?;
1591
4
            let mut fields: Vec<(String, _)> = vec![];
1592
4
            fields.push((DATA_FIELD_NAME.into(), data.to_vec()));
1593
4
            for (
name2
,
value2
) in maybe_index {
1594
2
                fields.push((name.into(), value.to_vec()));
1595
2
            }
1596
4
            match client
1597
4
                .connection_manager
1598
4
                .hset_multiple::<_, _, _, ()>(redis_key.as_ref(), &fields)
1599
4
                .await
1600
            {
1601
3
                Ok(v) => v,
1602
1
                Err(err)
1603
1
                    if err.kind() == redis::ErrorKind::Server(redis::ServerErrorKind::ReadOnly) =>
1604
                {
1605
1
                    client.reconnect(&self.connection_manager).await
?0
;
1606
1
                    client
1607
1
                        .connection_manager
1608
1
                        .hset_multiple::<_, _, _, ()>(redis_key.as_ref(), &fields)
1609
1
                        .await
1610
1
                        .err_tip(|| 
format!0
("(after reconnect) In RedisStore::update_data::noversion for {redis_key}"))
?0
;
1611
                }
1612
0
                Err(err) => {
1613
0
                    let mut error: Error = err.into();
1614
0
                    error.messages.push(format!(
1615
                        "In RedisStore::update_data::noversion for {redis_key}"
1616
                    ));
1617
0
                    return Err(error);
1618
                }
1619
            }
1620
            // If we have a publish channel configured, send a notice that the key has been set.
1621
4
            if let Some(
pub_sub_channel3
) = &self.pub_sub_channel {
1622
3
                return Ok(client
1623
3
                    .connection_manager
1624
3
                    .publish(pub_sub_channel, redis_key.as_ref())
1625
3
                    .await
?0
);
1626
1
            }
1627
1
            Ok(Some(0)) // Always use "0" version since this is not a versioned request.
1628
        }
1629
21
    }
1630
1631
24
    async fn search_by_index_prefix<K>(
1632
24
        &self,
1633
24
        index: K,
1634
24
    ) -> Result<
1635
24
        impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
1636
24
        Error,
1637
24
    >
1638
24
    where
1639
24
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,
1640
24
    {
1641
24
        let index_value = index.index_value();
1642
24
        try_sanitize(index_value.as_ref())
1643
24
            .then_some(())
1644
24
            .err_tip(|| 
{0
1645
0
                format!("In RedisStore::search_by_index_prefix::try_sanitize - {index_value:?}")
1646
0
            })?;
1647
25
        let 
run_ft_aggregate24
= |connection_manager: C| async {
1648
25
            ft_aggregate(
1649
25
                connection_manager,
1650
25
                format!(
1651
                    "{}",
1652
25
                    get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1653
                ),
1654
25
                if index_value.is_empty() {
1655
2
                    "*".to_string()
1656
                } else {
1657
23
                    format!("@{}:{{ {} }}", K::INDEX_NAME, index_value)
1658
                },
1659
                FtAggregateOptions {
1660
25
                    load: vec![DATA_FIELD_NAME.into(), VERSION_FIELD_NAME.into()],
1661
25
                    cursor: FtAggregateCursor {
1662
25
                        count: self.max_count_per_cursor,
1663
25
                        max_idle: CURSOR_IDLE_MS,
1664
25
                    },
1665
25
                    sort_by: K::MAYBE_SORT_KEY.map_or_else(Vec::new, |v| 
vec!22
[
format!22
("@{v}")]),
1666
                },
1667
            )
1668
25
            .await
1669
50
        };
1670
24
        let run_ft_create = |connection_manager: C| async 
{1
1671
1
            let mut schema = vec![SearchSchema {
1672
1
                field_name: K::INDEX_NAME.into(),
1673
1
                sortable: false,
1674
1
            }];
1675
1
            if let Some(sort_key) = K::MAYBE_SORT_KEY {
1676
1
                schema.push(SearchSchema {
1677
1
                    field_name: sort_key.into(),
1678
1
                    sortable: true,
1679
1
                });
1680
1
            
}0
1681
1
            let create_options = FtCreateOptions {
1682
1
                prefixes: vec![K::KEY_PREFIX.into()],
1683
1
                nohl: true,
1684
1
                nofields: true,
1685
1
                nofreqs: true,
1686
1
                nooffsets: true,
1687
1
                temporary: Some(INDEX_TTL_S),
1688
1
            };
1689
1
            let index = format!(
1690
                "{}",
1691
1
                get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1692
            );
1693
1
            ft_create(connection_manager, index, create_options, schema).await
1694
2
        };
1695
1696
24
        let (connection_manager, connect_id) = self.connection_manager.get_connection().await
?0
;
1697
24
        let 
stream23
= match run_ft_aggregate(connection_manager.clone()).await {
1698
0
            Err(err)
1699
1
                if err.kind() == redis::ErrorKind::Server(redis::ServerErrorKind::ReadOnly
)0
=>
1700
            {
1701
0
                let (connection_manager, _connect_id) =
1702
0
                    self.connection_manager.reconnect(connect_id).await?;
1703
0
                run_ft_aggregate(connection_manager).await.err_tip(|| {
1704
0
                    format!(
1705
                        "Error with reconnected ft_aggregate in RedisStore::search_by_index_prefix({})",
1706
0
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1707
                    )
1708
0
                })
1709
            }
1710
            Err(_) => {
1711
1
                let (connection_manager, result) =
1712
1
                    match run_ft_create(connection_manager.clone()).await {
1713
0
                        Err(err)
1714
1
                            if err.kind()
1715
0
                                == redis::ErrorKind::Server(redis::ServerErrorKind::ReadOnly) =>
1716
                        {
1717
0
                            let (connection_manager, _connect_id) =
1718
0
                                self.connection_manager.reconnect(connect_id).await?;
1719
                            (
1720
0
                                connection_manager.clone(),
1721
0
                                run_ft_create(connection_manager).await,
1722
                            )
1723
                        }
1724
1
                        result => (connection_manager, result),
1725
                    };
1726
1
                let create_result = result.err_tip(|| {
1727
1
                    format!(
1728
                        "Error with ft_create in RedisStore::search_by_index_prefix({})",
1729
1
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1730
                    )
1731
1
                });
1732
1733
1
                let run_result = run_ft_aggregate(connection_manager).await.err_tip(|| {
1734
1
                    format!(
1735
                        "Error with second ft_aggregate in RedisStore::search_by_index_prefix({})",
1736
1
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1737
                    )
1738
1
                });
1739
1740
                // Creating the index will race which is ok. If it fails to create, we only
1741
                // error if the second ft_aggregate call fails and fails to create.
1742
1
                run_result.or_else(move |e| create_result.merge(Err(e)))
1743
            }
1744
23
            Ok(stream) => Ok(stream),
1745
1
        }?;
1746
1747
23
        Ok(stream.filter_map(|result| async move 
{13
1748
13
            let raw_redis_map = match result {
1749
13
                Ok(v) => v,
1750
0
                Err(e) => {
1751
                    return Some(
1752
0
                        Err(Error::from(e))
1753
0
                            .err_tip(|| "Error in stream of in RedisStore::search_by_index_prefix"),
1754
                    );
1755
                }
1756
            };
1757
1758
13
            let Some(redis_map) = raw_redis_map.as_sequence() else {
1759
0
                return Some(Err(Error::new(
1760
0
                    Code::Internal,
1761
0
                    format!("Non-array from ft_aggregate: {raw_redis_map:?}"),
1762
0
                )));
1763
            };
1764
13
            let mut redis_map_iter = redis_map.iter();
1765
13
            let mut bytes_data: Option<Bytes> = None;
1766
13
            let mut version: Option<i64> = None;
1767
40
            while let Some(
key27
) = redis_map_iter.next() {
1768
27
                let value = redis_map_iter.next().unwrap();
1769
27
                let Value::BulkString(k) = key else {
1770
0
                    return Some(Err(Error::new(
1771
0
                        Code::Internal,
1772
0
                        format!("Non-BulkString key from ft_aggregate: {key:?}"),
1773
0
                    )));
1774
                };
1775
27
                let Ok(str_key) = str::from_utf8(k) else {
1776
0
                    return Some(Err(Error::new(
1777
0
                        Code::Internal,
1778
0
                        format!("Non-utf8 key from ft_aggregate: {key:?}"),
1779
0
                    )));
1780
                };
1781
27
                let Value::BulkString(v) = value else {
1782
0
                    return Some(Err(Error::new(
1783
0
                        Code::Internal,
1784
0
                        format!("Non-BulkString value from ft_aggregate: {key:?}"),
1785
0
                    )));
1786
                };
1787
27
                match str_key {
1788
27
                    DATA_FIELD_NAME => {
1789
13
                        bytes_data = Some(v.clone().into());
1790
13
                    }
1791
14
                    VERSION_FIELD_NAME => {
1792
13
                        let Ok(str_v) = str::from_utf8(v) else {
1793
0
                            return Some(Err(Error::new(
1794
0
                                Code::Internal,
1795
0
                                format!("Non-utf8 version value from ft_aggregate: {v:?}"),
1796
0
                            )));
1797
                        };
1798
13
                        let Ok(raw_version) = str_v.parse::<i64>() else {
1799
0
                            return Some(Err(Error::new(
1800
0
                                Code::Internal,
1801
0
                                format!("Non-integer version value from ft_aggregate: {str_v:?}"),
1802
0
                            )));
1803
                        };
1804
13
                        version = Some(raw_version);
1805
                    }
1806
1
                    other => {
1807
1
                        if K::MAYBE_SORT_KEY == Some(other) {
1808
1
                            // ignore sort keys
1809
1
                        } else {
1810
0
                            return Some(Err(Error::new(
1811
0
                                Code::Internal,
1812
0
                                format!("Extra keys from ft_aggregate: {other}"),
1813
0
                            )));
1814
                        }
1815
                    }
1816
                }
1817
            }
1818
13
            let Some(found_bytes_data) = bytes_data else {
1819
0
                return Some(Err(Error::new(
1820
0
                    Code::Internal,
1821
0
                    format!("Missing '{DATA_FIELD_NAME}' in ft_aggregate, got: {raw_redis_map:?}"),
1822
0
                )));
1823
            };
1824
            Some(
1825
13
                K::decode(version.unwrap_or(0), found_bytes_data)
1826
13
                    .err_tip(|| "In RedisStore::search_by_index_prefix::decode"),
1827
            )
1828
26
        }))
1829
24
    }
1830
1831
94
    async fn get_and_decode<K>(
1832
94
        &self,
1833
94
        key: K,
1834
94
    ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>
1835
94
    where
1836
94
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send,
1837
94
    {
1838
94
        let key = key.get_key();
1839
94
        let key = self.encode_key(&key);
1840
94
        let mut client = self.get_client().await
?0
;
1841
94
        let results: Vec<Value> = client
1842
94
            .connection_manager
1843
94
            .hmget::<_, Vec<String>, Vec<Value>>(
1844
94
                key.as_ref(),
1845
94
                vec![VERSION_FIELD_NAME.into(), DATA_FIELD_NAME.into()],
1846
94
            )
1847
94
            .await
1848
94
            .err_tip(|| 
format!0
("In RedisStore::get_without_version::notversioned {key}"))
?0
;
1849
94
        let Some(Value::BulkString(
data48
)) = results.get(1) else {
1850
46
            return Ok(None);
1851
        };
1852
        #[allow(clippy::get_first)]
1853
48
        let version = if let Some(raw_v) = results.get(0) {
1854
48
            match raw_v {
1855
0
                Value::Int(v) => *v,
1856
46
                Value::BulkString(v) => i64::from_str(str::from_utf8(v).expect("utf-8 bulkstring"))
1857
46
                    .expect("integer bulkstring"),
1858
2
                Value::Nil => 0,
1859
                _ => {
1860
0
                    warn!(?raw_v, "Non-integer version!");
1861
0
                    0
1862
                }
1863
            }
1864
        } else {
1865
0
            0
1866
        };
1867
        Ok(Some(
1868
48
            K::decode(version, Bytes::from(data.clone())).err_tip(|| 
{0
1869
0
                format!("In RedisStore::get_with_version::notversioned::decode {key}")
1870
0
            })?,
1871
        ))
1872
94
    }
1873
}