Coverage Report

Created: 2025-12-17 22:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/redis_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::ops::{Bound, RangeBounds};
16
use core::pin::Pin;
17
use core::time::Duration;
18
use core::{cmp, iter};
19
use std::borrow::Cow;
20
use std::sync::{Arc, Weak};
21
22
use async_trait::async_trait;
23
use bytes::Bytes;
24
use const_format::formatcp;
25
use fred::clients::SubscriberClient;
26
use fred::interfaces::{ClientLike, KeysInterface, PubsubInterface};
27
use fred::prelude::{Client, EventInterface, HashesInterface, RediSearchInterface};
28
use fred::types::config::{
29
    Config as RedisConfig, ConnectionConfig, PerformanceConfig, ReconnectPolicy, UnresponsiveConfig,
30
};
31
use fred::types::redisearch::{
32
    AggregateOperation, FtAggregateOptions, FtCreateOptions, IndexKind, Load, SearchField,
33
    SearchSchema, SearchSchemaKind, WithCursor,
34
};
35
use fred::types::scan::Scanner;
36
use fred::types::scripts::Script;
37
use fred::types::{Builder, Key as RedisKey, Map as RedisMap, SortOrder, Value as RedisValue};
38
use futures::stream::FuturesUnordered;
39
use futures::{FutureExt, Stream, StreamExt, TryStreamExt, future};
40
use itertools::izip;
41
use nativelink_config::stores::{RedisMode, RedisSpec};
42
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
43
use nativelink_metric::MetricsComponent;
44
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
45
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
46
use nativelink_util::spawn;
47
use nativelink_util::store_trait::{
48
    BoolValue, RemoveItemCallback, SchedulerCurrentVersionProvider, SchedulerIndexProvider,
49
    SchedulerStore, SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,
50
    SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo,
51
};
52
use nativelink_util::task::JoinHandleDropGuard;
53
use parking_lot::{Mutex, RwLock};
54
use patricia_tree::StringPatriciaMap;
55
use tokio::select;
56
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
57
use tokio::time::sleep;
58
use tracing::{error, info, trace, warn};
59
use uuid::Uuid;
60
61
use crate::cas_utils::is_zero_digest;
62
use crate::redis_utils::ft_aggregate;
63
64
/// The default size of the read chunk when reading data from Redis.
65
/// Note: If this changes it should be updated in the config documentation.
66
const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024;
67
68
/// The default size of the connection pool if not specified.
69
/// Note: If this changes it should be updated in the config documentation.
70
const DEFAULT_CONNECTION_POOL_SIZE: usize = 3;
71
72
/// The default delay between retries if not specified.
73
/// Note: If this changes it should be updated in the config documentation.
74
const DEFAULT_RETRY_DELAY: f32 = 0.1;
75
/// The amount of jitter to add to the retry delay if not specified.
76
/// Note: If this changes it should be updated in the config documentation.
77
const DEFAULT_RETRY_JITTER: f32 = 0.5;
78
79
/// The default maximum capacity of the broadcast channel if not specified.
80
/// Note: If this changes it should be updated in the config documentation.
81
const DEFAULT_BROADCAST_CHANNEL_CAPACITY: usize = 4096;
82
83
/// The default connection timeout in milliseconds if not specified.
84
/// Note: If this changes it should be updated in the config documentation.
85
const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000;
86
87
/// The default command timeout in milliseconds if not specified.
88
/// Note: If this changes it should be updated in the config documentation.
89
const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000;
90
91
/// The default maximum number of chunk uploads per update.
92
/// Note: If this changes it should be updated in the config documentation.
93
const DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE: usize = 10;
94
95
/// The default COUNT value passed when scanning keys in Redis.
96
/// Note: If this changes it should be updated in the config documentation.
97
const DEFAULT_SCAN_COUNT: u32 = 10_000;
98
99
const DEFAULT_CLIENT_PERMITS: usize = 500;
100
101
#[derive(Clone, Debug)]
102
pub struct RecoverablePool {
103
    clients: Arc<RwLock<Vec<Client>>>,
104
    builder: Builder,
105
    counter: Arc<core::sync::atomic::AtomicUsize>,
106
}
107
108
impl RecoverablePool {
109
14
    pub fn new(builder: Builder, size: usize) -> Result<Self, Error> {
110
14
        let mut clients = Vec::with_capacity(size);
111
14
        for _ in 0..size {
112
18
            let client = builder
113
18
                .build()
114
18
                .err_tip(|| "Failed to build client in RecoverablePool::new")
?0
;
115
18
            clients.push(client);
116
        }
117
14
        Ok(Self {
118
14
            clients: Arc::new(RwLock::new(clients)),
119
14
            builder,
120
14
            counter: Arc::new(core::sync::atomic::AtomicUsize::new(0)),
121
14
        })
122
14
    }
123
124
14
    fn connect(&self) {
125
14
        let clients = self.clients.read();
126
18
        for client in 
clients14
.
iter14
() {
127
18
            client.connect();
128
18
        }
129
14
    }
130
131
122
    fn next(&self) -> Client {
132
122
        let clients = self.clients.read();
133
122
        let index = self
134
122
            .counter
135
122
            .fetch_add(1, core::sync::atomic::Ordering::Relaxed);
136
122
        clients[index % clients.len()].clone()
137
122
    }
138
139
2
    async fn replace_client(&self, old_client: &Client) -> Result<Client, Error> {
140
        {
141
2
            let clients = self.clients.read();
142
2
            if !clients.iter().any(|c| c.id() == old_client.id()) {
  Branch (142:16): [True: 0, False: 0]
  Branch (142:16): [True: 0, False: 2]
  Branch (142:16): [Folded - Ignored]
  Branch (142:16): [True: 0, False: 0]
143
                // Someone else swapped this client already; just hand out the next pooled one.
144
0
                return Ok(self.next());
145
2
            }
146
        }
147
148
2
        let new_client = self
149
2
            .builder
150
2
            .build()
151
2
            .err_tip(|| "Failed to build new client in RecoverablePool::replace_client")
?0
;
152
2
        new_client.connect();
153
2
        new_client.wait_for_connect().await.err_tip(|| {
154
2
            format!(
155
2
                "Failed to connect new client while replacing Redis client {}",
156
2
                old_client.id()
157
            )
158
2
        })?;
159
160
0
        let replaced_client = {
161
0
            let mut clients = self.clients.write();
162
0
            clients
163
0
                .iter()
164
0
                .position(|c| c.id() == old_client.id())
165
0
                .map(|index| core::mem::replace(&mut clients[index], new_client.clone()))
166
        };
167
168
0
        if let Some(old_client) = replaced_client {
  Branch (168:16): [True: 0, False: 0]
  Branch (168:16): [True: 0, False: 0]
  Branch (168:16): [Folded - Ignored]
  Branch (168:16): [True: 0, False: 0]
169
0
            let _unused = old_client.quit().await;
170
0
            info!("Replaced Redis client {}", old_client.id());
171
0
            Ok(new_client)
172
        } else {
173
            // Second race: pool entry changed after we connected the new client.
174
0
            let _unused = new_client.quit().await;
175
0
            Ok(self.next())
176
        }
177
2
    }
178
}
179
180
/// A [`StoreDriver`] implementation that uses Redis as a backing store.
181
#[derive(Debug, MetricsComponent)]
182
pub struct RedisStore {
183
    /// The client pool connecting to the backing Redis instance(s).
184
    client_pool: RecoverablePool,
185
186
    /// A channel to publish updates to when a key is added, removed, or modified.
187
    #[metric(
188
        help = "The pubsub channel to publish updates to when a key is added, removed, or modified"
189
    )]
190
    pub_sub_channel: Option<String>,
191
192
    /// A redis client for managing subscriptions.
193
    /// TODO: This should be moved into the store in followups once a standard use pattern has been determined.
194
    subscriber_client: SubscriberClient,
195
196
    /// A function used to generate names for temporary keys.
197
    temp_name_generator_fn: fn() -> String,
198
199
    /// A common prefix to append to all keys before they are sent to Redis.
200
    ///
201
    /// See [`RedisStore::key_prefix`](`nativelink_config::stores::RedisStore::key_prefix`).
202
    #[metric(help = "Prefix to append to all keys before sending to Redis")]
203
    key_prefix: String,
204
205
    /// The amount of data to read from Redis at a time.
206
    #[metric(help = "The amount of data to read from Redis at a time")]
207
    read_chunk_size: usize,
208
209
    /// The maximum number of chunk uploads per update.
210
    /// This is used to limit the number of chunk uploads per update to prevent
211
    #[metric(help = "The maximum number of chunk uploads per update")]
212
    max_chunk_uploads_per_update: usize,
213
214
    /// The COUNT value passed when scanning keys in Redis.
215
    /// This is used to hint the amount of work that should be done per response.
216
    #[metric(help = "The COUNT value passed when scanning keys in Redis")]
217
    scan_count: u32,
218
219
    /// Redis script used to update a value in redis if the version matches.
220
    /// This is done by incrementing the version number and then setting the new data
221
    /// only if the version number matches the existing version number.
222
    update_if_version_matches_script: Script,
223
224
    /// A manager for subscriptions to keys in Redis.
225
    subscription_manager: Mutex<Option<Arc<RedisSubscriptionManager>>>,
226
227
    /// Permits to limit inflight Redis requests. Technically only
228
    /// limits the calls to `get_client()`, but the requests per client
229
    /// are small enough that it works well enough.
230
    client_permits: Arc<Semaphore>,
231
}
232
233
struct ClientWithPermit {
234
    client: Client,
235
236
    // here so it sticks around with the client and doesn't get dropped until that does
237
    #[allow(dead_code)]
238
    semaphore_permit: OwnedSemaphorePermit,
239
}
240
241
impl Drop for ClientWithPermit {
242
120
    fn drop(&mut self) {
243
120
        trace!(
244
120
            remaining = self.semaphore_permit.semaphore().available_permits(),
245
120
            "Dropping a client permit"
246
        );
247
120
    }
248
}
249
250
impl RedisStore {
251
    /// Create a new `RedisStore` from the given configuration.
252
2
    pub fn new(mut spec: RedisSpec) -> Result<Arc<Self>, Error> {
253
2
        if spec.addresses.is_empty() {
  Branch (253:12): [True: 0, False: 2]
  Branch (253:12): [Folded - Ignored]
254
0
            return Err(make_err!(
255
0
                Code::InvalidArgument,
256
0
                "No addresses were specified in redis store configuration."
257
0
            ));
258
2
        }
259
2
        let [addr] = spec.addresses.as_slice() else {
  Branch (259:13): [True: 2, False: 0]
  Branch (259:13): [Folded - Ignored]
260
0
            return Err(make_err!(
261
0
                Code::Unimplemented,
262
0
                "Connecting directly to multiple redis nodes in a cluster is currently unsupported. Please specify a single URL to a single node, and nativelink will use cluster discover to find the other nodes."
263
0
            ));
264
        };
265
2
        let redis_config = match spec.mode {
266
0
            RedisMode::Cluster => RedisConfig::from_url_clustered(addr),
267
0
            RedisMode::Sentinel => RedisConfig::from_url_sentinel(addr),
268
2
            RedisMode::Standard => RedisConfig::from_url_centralized(addr),
269
        }
270
2
        .err_tip_with_code(|e| 
{0
271
0
            (
272
0
                Code::InvalidArgument,
273
0
                format!("while parsing redis node address: {e}"),
274
0
            )
275
0
        })?;
276
277
2
        let reconnect_policy = {
278
2
            if spec.retry.delay == 0.0 {
  Branch (278:16): [True: 2, False: 0]
  Branch (278:16): [Folded - Ignored]
279
2
                spec.retry.delay = DEFAULT_RETRY_DELAY;
280
2
            
}0
281
2
            if spec.retry.jitter == 0.0 {
  Branch (281:16): [True: 2, False: 0]
  Branch (281:16): [Folded - Ignored]
282
2
                spec.retry.jitter = DEFAULT_RETRY_JITTER;
283
2
            
}0
284
285
4
            let 
to_ms2
= |secs: f32| -> u32 {
286
4
                Duration::from_secs_f32(secs)
287
4
                    .as_millis()
288
4
                    .try_into()
289
4
                    .unwrap_or(u32::MAX)
290
4
            };
291
292
2
            let max_retries = u32::try_from(spec.retry.max_retries)
293
2
                .err_tip(|| "max_retries could not be converted to u32 in RedisStore::new")
?0
;
294
295
2
            let min_delay_ms = to_ms(spec.retry.delay);
296
2
            let max_delay_ms = 8000;
297
2
            let jitter = to_ms(spec.retry.jitter * spec.retry.delay);
298
299
2
            let mut reconnect_policy =
300
2
                ReconnectPolicy::new_exponential(max_retries, min_delay_ms, max_delay_ms, 2);
301
2
            reconnect_policy.set_jitter(jitter);
302
2
            reconnect_policy
303
        };
304
305
        {
306
2
            if spec.broadcast_channel_capacity == 0 {
  Branch (306:16): [True: 2, False: 0]
  Branch (306:16): [Folded - Ignored]
307
2
                spec.broadcast_channel_capacity = DEFAULT_BROADCAST_CHANNEL_CAPACITY;
308
2
            
}0
309
2
            if spec.connection_timeout_ms == 0 {
  Branch (309:16): [True: 2, False: 0]
  Branch (309:16): [Folded - Ignored]
310
2
                spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS;
311
2
            
}0
312
2
            if spec.command_timeout_ms == 0 {
  Branch (312:16): [True: 2, False: 0]
  Branch (312:16): [Folded - Ignored]
313
2
                spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS;
314
2
            
}0
315
2
            if spec.connection_pool_size == 0 {
  Branch (315:16): [True: 2, False: 0]
  Branch (315:16): [Folded - Ignored]
316
2
                spec.connection_pool_size = DEFAULT_CONNECTION_POOL_SIZE;
317
2
            
}0
318
2
            if spec.read_chunk_size == 0 {
  Branch (318:16): [True: 2, False: 0]
  Branch (318:16): [Folded - Ignored]
319
2
                spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
320
2
            
}0
321
2
            if spec.max_chunk_uploads_per_update == 0 {
  Branch (321:16): [True: 2, False: 0]
  Branch (321:16): [Folded - Ignored]
322
2
                spec.max_chunk_uploads_per_update = DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE;
323
2
            
}0
324
2
            if spec.scan_count == 0 {
  Branch (324:16): [True: 2, False: 0]
  Branch (324:16): [Folded - Ignored]
325
2
                spec.scan_count = DEFAULT_SCAN_COUNT;
326
2
            
}0
327
2
            if spec.max_client_permits == 0 {
  Branch (327:16): [True: 2, False: 0]
  Branch (327:16): [Folded - Ignored]
328
2
                spec.max_client_permits = DEFAULT_CLIENT_PERMITS;
329
2
            
}0
330
        }
331
2
        let connection_timeout = Duration::from_millis(spec.connection_timeout_ms);
332
2
        let command_timeout = Duration::from_millis(spec.command_timeout_ms);
333
334
2
        let mut builder = Builder::from_config(redis_config);
335
2
        builder
336
2
            .set_performance_config(PerformanceConfig {
337
2
                default_command_timeout: command_timeout,
338
2
                broadcast_channel_capacity: spec.broadcast_channel_capacity,
339
2
                ..Default::default()
340
2
            })
341
2
            .set_connection_config(ConnectionConfig {
342
2
                connection_timeout,
343
2
                internal_command_timeout: command_timeout,
344
2
                unresponsive: UnresponsiveConfig {
345
2
                    max_timeout: Some(connection_timeout),
346
2
                    // This number needs to be less than the connection timeout.
347
2
                    // We use 4 as it is a good balance between not spamming the server
348
2
                    // and not waiting too long.
349
2
                    interval: connection_timeout / 4,
350
2
                },
351
2
                ..Default::default()
352
2
            })
353
2
            .set_policy(reconnect_policy);
354
355
2
        let client_pool = RecoverablePool::new(builder.clone(), spec.connection_pool_size)
356
2
            .err_tip(|| "while creating redis connection pool")
?0
;
357
358
2
        let subscriber_client = builder
359
2
            .build_subscriber_client()
360
2
            .err_tip(|| "while creating redis subscriber client")
?0
;
361
362
2
        Self::new_from_builder_and_parts(
363
2
            client_pool,
364
2
            subscriber_client,
365
2
            spec.experimental_pub_sub_channel.clone(),
366
1
            || Uuid::new_v4().to_string(),
367
2
            spec.key_prefix.clone(),
368
2
            spec.read_chunk_size,
369
2
            spec.max_chunk_uploads_per_update,
370
2
            spec.scan_count,
371
2
            spec.max_client_permits,
372
        )
373
2
        .map(Arc::new)
374
2
    }
375
376
    /// Used for testing when determinism is required.
377
    #[expect(clippy::too_many_arguments)]
378
14
    pub fn new_from_builder_and_parts(
379
14
        client_pool: RecoverablePool,
380
14
        subscriber_client: SubscriberClient,
381
14
        pub_sub_channel: Option<String>,
382
14
        temp_name_generator_fn: fn() -> String,
383
14
        key_prefix: String,
384
14
        read_chunk_size: usize,
385
14
        max_chunk_uploads_per_update: usize,
386
14
        scan_count: u32,
387
14
        max_client_permits: usize,
388
14
    ) -> Result<Self, Error> {
389
        // Start connection pool (this will retry forever by default).
390
14
        client_pool.connect();
391
14
        subscriber_client.connect();
392
393
14
        info!("Redis index fingerprint: {FINGERPRINT_CREATE_INDEX_HEX}");
394
395
14
        Ok(Self {
396
14
            client_pool,
397
14
            pub_sub_channel,
398
14
            subscriber_client,
399
14
            temp_name_generator_fn,
400
14
            key_prefix,
401
14
            read_chunk_size,
402
14
            max_chunk_uploads_per_update,
403
14
            scan_count,
404
14
            update_if_version_matches_script: Script::from_lua(LUA_VERSION_SET_SCRIPT),
405
14
            subscription_manager: Mutex::new(None),
406
14
            client_permits: Arc::new(Semaphore::new(max_client_permits)),
407
14
        })
408
14
    }
409
410
122
    async fn get_client(&self) -> Result<ClientWithPermit, Error> {
411
122
        let mut client = self.client_pool.next();
412
        loop {
413
122
            let config = client.client_config();
414
122
            if config.mocks.is_some() {
  Branch (414:16): [True: 0, False: 0]
  Branch (414:16): [True: 24, False: 2]
  Branch (414:16): [Folded - Ignored]
  Branch (414:16): [True: 96, False: 0]
415
120
                break;
416
2
            }
417
2
            let connection_info = format!(
418
2
                "Connection issue connecting to redis server with hosts: {:?}, username: {}, database: {}",
419
2
                config
420
2
                    .server
421
2
                    .hosts()
422
2
                    .iter()
423
2
                    .map(|s| format!("{}:{}", s.host, s.port))
424
2
                    .collect::<Vec<String>>(),
425
2
                config
426
2
                    .username
427
2
                    .clone()
428
2
                    .unwrap_or_else(|| "None".to_string()),
429
2
                config.database.unwrap_or_default()
430
            );
431
2
            match client.wait_for_connect().await {
432
0
                Ok(()) => break,
433
2
                Err(e) => {
434
2
                    warn!("{connection_info}: {e:?}. Replacing client.");
435
2
                    
client0
= self
436
2
                        .client_pool
437
2
                        .replace_client(&client)
438
2
                        .await
439
2
                        .err_tip(|| connection_info.clone())?;
440
                }
441
            }
442
        }
443
120
        let local_client_permits = self.client_permits.clone();
444
120
        let remaining = local_client_permits.available_permits();
445
120
        let semaphore_permit = local_client_permits.acquire_owned().await
?0
;
446
120
        trace!(remaining, "Got a client permit");
447
120
        Ok(ClientWithPermit {
448
120
            client,
449
120
            semaphore_permit,
450
120
        })
451
122
    }
452
453
    /// Encode a [`StoreKey`] so it can be sent to Redis.
454
93
    fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> {
455
93
        let key_body = key.as_str();
456
93
        if self.key_prefix.is_empty() {
  Branch (456:12): [True: 89, False: 4]
  Branch (456:12): [Folded - Ignored]
457
89
            key_body
458
        } else {
459
            // This is in the hot path for all redis operations, so we try to reuse the allocation
460
            // from `key.as_str()` if possible.
461
4
            match key_body {
462
4
                Cow::Owned(mut encoded_key) => {
463
4
                    encoded_key.insert_str(0, &self.key_prefix);
464
4
                    Cow::Owned(encoded_key)
465
                }
466
0
                Cow::Borrowed(body) => {
467
0
                    let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len());
468
0
                    encoded_key.push_str(&self.key_prefix);
469
0
                    encoded_key.push_str(body);
470
0
                    Cow::Owned(encoded_key)
471
                }
472
            }
473
        }
474
93
    }
475
}
476
477
#[async_trait]
478
impl StoreDriver for RedisStore {
479
    async fn has_with_results(
480
        self: Pin<&Self>,
481
        keys: &[StoreKey<'_>],
482
        results: &mut [Option<u64>],
483
7
    ) -> Result<(), Error> {
484
        // TODO(palfrey) We could use pipeline here, but it makes retry more
485
        // difficult and it doesn't work very well in cluster mode.
486
        // If we wanted to optimize this with pipeline be careful to
487
        // implement retry and to support cluster mode.
488
489
        let client = self.get_client().await?;
490
491
        // If we ask for many keys in one go, this can timeout, so limit that
492
        let max_in_one_go = Arc::new(Semaphore::const_new(5));
493
494
        izip!(
495
            keys.iter(),
496
            results.iter_mut(),
497
            iter::repeat(&max_in_one_go),
498
            iter::repeat(&client)
499
        )
500
6
        .map(|(key, result, local_semaphore, client)| async move {
501
            // We need to do a special pass to ensure our zero key exist.
502
6
            if is_zero_digest(key.borrow()) {
  Branch (502:16): [True: 2, False: 4]
  Branch (502:16): [Folded - Ignored]
503
2
                *result = Some(0);
504
2
                return Ok::<_, Error>(());
505
4
            }
506
4
            let encoded_key = self.encode_key(key);
507
508
4
            let guard = local_semaphore.acquire().await
?0
;
509
510
4
            let pipeline = client.client.pipeline();
511
4
            pipeline
512
4
                .strlen::<(), _>(encoded_key.as_ref())
513
4
                .await
514
4
                .err_tip(|| format!(
"In RedisStore::has_with_results::strlen for {encoded_key}"0
))
?0
;
515
            // Redis returns 0 when the key doesn't exist
516
            // AND when the key exists with value of length 0.
517
            // Therefore, we need to check both length and existence
518
            // and do it in a pipeline for efficiency.
519
4
            pipeline
520
4
                .exists::<(), _>(encoded_key.as_ref())
521
4
                .await
522
4
                .err_tip(|| format!(
"In RedisStore::has_with_results::exists for {encoded_key}"0
))
?0
;
523
4
            let (blob_len, exists) = pipeline
524
4
                .all::<(u64, bool)>()
525
4
                .await
526
4
                .err_tip(|| "In RedisStore::has_with_results::all")
?0
;
527
528
4
            *result = if exists { Some(blob_len) } else { 
None0
};
  Branch (528:26): [True: 4, False: 0]
  Branch (528:26): [Folded - Ignored]
529
530
4
            drop(guard);
531
532
4
            Ok::<_, Error>(())
533
12
        })
534
        .collect::<FuturesUnordered<_>>()
535
        .try_collect()
536
        .await
537
7
    }
538
539
    async fn list(
540
        self: Pin<&Self>,
541
        range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
542
        handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
543
8
    ) -> Result<u64, Error> {
544
        let range = (
545
            range.0.map(StoreKey::into_owned),
546
            range.1.map(StoreKey::into_owned),
547
        );
548
        let pattern = match range.0 {
549
            Bound::Included(ref start) | Bound::Excluded(ref start) => match range.1 {
550
                Bound::Included(ref end) | Bound::Excluded(ref end) => {
551
                    let start = start.as_str();
552
                    let end = end.as_str();
553
                    let max_length = start.len().min(end.len());
554
                    let length = start
555
                        .chars()
556
                        .zip(end.chars())
557
20
                        .position(|(a, b)| a != b)
558
                        .unwrap_or(max_length);
559
                    format!("{}{}*", self.key_prefix, &start[..length])
560
                }
561
                Bound::Unbounded => format!("{}*", self.key_prefix),
562
            },
563
            Bound::Unbounded => format!("{}*", self.key_prefix),
564
        };
565
        let client = self.get_client().await?;
566
        let mut scan_stream = client.client.scan(pattern, Some(self.scan_count), None);
567
        let mut iterations = 0;
568
        'outer: while let Some(mut page) = scan_stream.try_next().await? {
569
            if let Some(keys) = page.take_results() {
570
                for key in keys {
571
                    // TODO: Notification of conversion errors
572
                    // Any results that do not conform to expectations are ignored.
573
                    if let Some(key) = key.as_str() {
574
                        if let Some(key) = key.strip_prefix(&self.key_prefix) {
575
                            let key = StoreKey::new_str(key);
576
                            if range.contains(&key) {
577
                                iterations += 1;
578
                                if !handler(&key) {
579
                                    break 'outer;
580
                                }
581
                            }
582
                        }
583
                    }
584
                }
585
            }
586
            page.next();
587
        }
588
        Ok(iterations)
589
8
    }
590
591
    async fn update(
592
        self: Pin<&Self>,
593
        key: StoreKey<'_>,
594
        mut reader: DropCloserReadHalf,
595
        _upload_size: UploadSizeInfo,
596
8
    ) -> Result<(), Error> {
597
        let final_key = self.encode_key(&key);
598
599
        // While the name generation function can be supplied by the user, we need to have the curly
600
        // braces in place in order to manage redis' hashing behavior and make sure that the temporary
601
        // key name and the final key name are directed to the same cluster node. See
602
        // https://redis.io/blog/redis-clustering-best-practices-with-keys/
603
        //
604
        // The TL;DR is that if we're in cluster mode and the names hash differently, we can't use request
605
        // pipelining. By using these braces, we tell redis to only hash the part of the temporary key that's
606
        // identical to the final key -- so they will always hash to the same node.
607
        let temp_key = format!(
608
            "temp-{}-{{{}}}",
609
            (self.temp_name_generator_fn)(),
610
            &final_key
611
        );
612
613
        if is_zero_digest(key.borrow()) {
614
            let chunk = reader
615
                .peek()
616
                .await
617
                .err_tip(|| "Failed to peek in RedisStore::update")?;
618
            if chunk.is_empty() {
619
                reader
620
                    .drain()
621
                    .await
622
                    .err_tip(|| "Failed to drain in RedisStore::update")?;
623
                // Zero-digest keys are special -- we don't need to do anything with it.
624
                return Ok(());
625
            }
626
        }
627
628
        let client = self.get_client().await?;
629
630
        let mut read_stream = reader
631
6
            .scan(0u32, |bytes_read, chunk_res| {
632
6
                future::ready(Some(
633
6
                    chunk_res
634
6
                        .err_tip(|| "Failed to read chunk in update in redis store")
635
6
                        .and_then(|chunk| 
{5
636
5
                            let offset = *bytes_read;
637
5
                            let chunk_len = u32::try_from(chunk.len()).err_tip(
638
                                || "Could not convert chunk length to u32 in RedisStore::update",
639
0
                            )?;
640
5
                            let new_bytes_read = bytes_read
641
5
                                .checked_add(chunk_len)
642
5
                                .err_tip(|| "Overflow protection in RedisStore::update")
?0
;
643
5
                            *bytes_read = new_bytes_read;
644
5
                            Ok::<_, Error>((offset, *bytes_read, chunk))
645
5
                        }),
646
                ))
647
6
            })
648
6
            .map(|res| {
649
6
                let (
offset5
,
end_pos5
,
chunk5
) = res
?1
;
650
5
                let temp_key_ref = &temp_key;
651
5
                let client = client.client.clone();
652
5
                Ok(async move {
653
5
                    client
654
5
                        .setrange::<(), _, _>(temp_key_ref, offset, chunk)
655
5
                        .await
656
5
                        .err_tip(
657
0
                            || format!("While appending to temp key ({temp_key_ref}) in RedisStore::update. offset = {offset}. end_pos = {end_pos}"),
658
0
                        )?;
659
5
                    Ok::<u32, Error>(end_pos)
660
5
                })
661
6
            })
662
            .try_buffer_unordered(self.max_chunk_uploads_per_update);
663
664
        let mut total_len: u32 = 0;
665
        while let Some(last_pos) = read_stream.try_next().await? {
666
            if last_pos > total_len {
667
                total_len = last_pos;
668
            }
669
        }
670
671
        let blob_len = client
672
            .client
673
            .strlen::<u64, _>(&temp_key)
674
            .await
675
0
            .err_tip(|| format!("In RedisStore::update strlen check for {temp_key}"))?;
676
        // This is a safety check to ensure that in the event some kind of retry was to happen
677
        // and the data was appended to the key twice, we reject the data.
678
        if blob_len != u64::from(total_len) {
679
            return Err(make_input_err!(
680
                "Data length mismatch in RedisStore::update for {}({}) - expected {} bytes, got {} bytes",
681
                key.borrow().as_str(),
682
                temp_key,
683
                total_len,
684
                blob_len,
685
            ));
686
        }
687
688
        // Rename the temp key so that the data appears under the real key. Any data already present in the real key is lost.
689
        client
690
            .client
691
            .rename::<(), _, _>(&temp_key, final_key.as_ref())
692
            .await
693
            .err_tip(|| "While queueing key rename in RedisStore::update()")?;
694
695
        // If we have a publish channel configured, send a notice that the key has been set.
696
        if let Some(pub_sub_channel) = &self.pub_sub_channel {
697
            return Ok(client
698
                .client
699
                .publish(pub_sub_channel, final_key.as_ref())
700
                .await?);
701
        }
702
703
        Ok(())
704
8
    }
705
706
    async fn get_part(
707
        self: Pin<&Self>,
708
        key: StoreKey<'_>,
709
        writer: &mut DropCloserWriteHalf,
710
        offset: u64,
711
        length: Option<u64>,
712
5
    ) -> Result<(), Error> {
713
        let offset = usize::try_from(offset).err_tip(|| "Could not convert offset to usize")?;
714
        let length = length
715
4
            .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize"))
716
            .transpose()?;
717
718
        // To follow RBE spec we need to consider any digest's with
719
        // zero size to be existing.
720
        if is_zero_digest(key.borrow()) {
721
            return writer
722
                .send_eof()
723
                .err_tip(|| "Failed to send zero EOF in redis store get_part");
724
        }
725
726
        let encoded_key = self.encode_key(&key);
727
        let encoded_key = encoded_key.as_ref();
728
729
        // N.B. the `-1`'s you see here are because redis GETRANGE is inclusive at both the start and end, so when we
730
        // do math with indices we change them to be exclusive at the end.
731
732
        // We want to read the data at the key from `offset` to `offset + length`.
733
        let data_start = offset;
734
        let data_end = data_start
735
            .saturating_add(length.unwrap_or(isize::MAX as usize))
736
            .saturating_sub(1);
737
738
        // And we don't ever want to read more than `read_chunk_size` bytes at a time, so we'll need to iterate.
739
        let mut chunk_start = data_start;
740
        let mut chunk_end = cmp::min(
741
            data_start.saturating_add(self.read_chunk_size) - 1,
742
            data_end,
743
        );
744
745
        let client = self.get_client().await?;
746
        loop {
747
            let chunk: Bytes = client
748
                .client
749
                .getrange(encoded_key, chunk_start, chunk_end)
750
                .await
751
                .err_tip(|| "In RedisStore::get_part::getrange")?;
752
753
            let didnt_receive_full_chunk = chunk.len() < self.read_chunk_size;
754
            let reached_end_of_data = chunk_end == data_end;
755
756
            if didnt_receive_full_chunk || reached_end_of_data {
757
                if !chunk.is_empty() {
758
                    writer
759
                        .send(chunk)
760
                        .await
761
                        .err_tip(|| "Failed to write data in RedisStore::get_part")?;
762
                }
763
764
                break; // No more data to read.
765
            }
766
767
            // We received a full chunk's worth of data, so write it...
768
            writer
769
                .send(chunk)
770
                .await
771
                .err_tip(|| "Failed to write data in RedisStore::get_part")?;
772
773
            // ...and go grab the next chunk.
774
            chunk_start = chunk_end + 1;
775
            chunk_end = cmp::min(
776
                chunk_start.saturating_add(self.read_chunk_size) - 1,
777
                data_end,
778
            );
779
        }
780
781
        // If we didn't write any data, check if the key exists, if not return a NotFound error.
782
        // This is required by spec.
783
        if writer.get_bytes_written() == 0 {
784
            // We're supposed to read 0 bytes, so just check if the key exists.
785
            let exists = client
786
                .client
787
                .exists::<bool, _>(encoded_key)
788
                .await
789
                .err_tip(|| "In RedisStore::get_part::zero_exists")?;
790
791
            if !exists {
792
                return Err(make_err!(
793
                    Code::NotFound,
794
                    "Data not found in Redis store for digest: {key:?}"
795
                ));
796
            }
797
        }
798
799
        writer
800
            .send_eof()
801
            .err_tip(|| "Failed to write EOF in redis store get_part")
802
5
    }
803
804
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
805
0
        self
806
0
    }
807
808
0
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send) {
809
0
        self
810
0
    }
811
812
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send> {
813
0
        self
814
0
    }
815
816
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
817
0
        registry.register_indicator(self);
818
0
    }
819
820
0
    fn register_remove_callback(
821
0
        self: Arc<Self>,
822
0
        _callback: Arc<dyn RemoveItemCallback>,
823
0
    ) -> Result<(), Error> {
824
        // As redis doesn't drop stuff, we can just ignore this
825
0
        Ok(())
826
0
    }
827
}
828
829
#[async_trait]
830
impl HealthStatusIndicator for RedisStore {
831
1
    fn get_name(&self) -> &'static str {
832
1
        "RedisStore"
833
1
    }
834
835
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
836
        StoreDriver::check_health(Pin::new(self), namespace).await
837
0
    }
838
}
839
840
// -------------------------------------------------------------------
841
// Below this line are specific to the redis scheduler implementation.
842
// -------------------------------------------------------------------
843
844
/// The maximum number of results to return per cursor.
845
const MAX_COUNT_PER_CURSOR: u64 = 256;
846
/// The time in milliseconds that a redis cursor can be idle before it is closed.
847
const CURSOR_IDLE_MS: u64 = 2_000;
848
/// The name of the field in the Redis hash that stores the data.
849
const DATA_FIELD_NAME: &str = "data";
850
/// The name of the field in the Redis hash that stores the version.
851
const VERSION_FIELD_NAME: &str = "version";
852
/// The time to live of indexes in seconds. After this time redis may delete the index.
853
const INDEX_TTL_S: u64 = 60 * 60 * 24; // 24 hours.
854
855
/// Lua script to set a key if the version matches.
856
/// Args:
857
///   KEYS[1]: The key where the version is stored.
858
///   ARGV[1]: The expected version.
859
///   ARGV[2]: The new data.
860
///   ARGV[3*]: Key-value pairs of additional data to include.
861
/// Returns:
862
///   The new version if the version matches. nil is returned if the
863
///   value was not set.
864
const LUA_VERSION_SET_SCRIPT: &str = formatcp!(
865
    r"
866
local key = KEYS[1]
867
local expected_version = tonumber(ARGV[1])
868
local new_data = ARGV[2]
869
local new_version = redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', 1)
870
local i
871
local indexes = {{}}
872
873
if new_version-1 ~= expected_version then
874
    redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', -1)
875
    return {{ 0, new_version-1 }}
876
end
877
-- Skip first 2 argvs, as they are known inputs.
878
-- Remember: Lua is 1-indexed.
879
for i=3, #ARGV do
880
    indexes[i-2] = ARGV[i]
881
end
882
883
-- In testing we witnessed redis sometimes not update our FT indexes
884
-- resulting in stale data. It appears if we delete our keys then insert
885
-- them again it works and reduces risk significantly.
886
redis.call('DEL', key)
887
redis.call('HSET', key, '{DATA_FIELD_NAME}', new_data, '{VERSION_FIELD_NAME}', new_version, unpack(indexes))
888
889
return {{ 1, new_version }}
890
"
891
);
892
893
/// This is the output of the calculations below hardcoded into the executable.
894
const FINGERPRINT_CREATE_INDEX_HEX: &str = "3e762c15";
895
896
#[cfg(test)]
897
mod test {
898
    use super::FINGERPRINT_CREATE_INDEX_HEX;
899
900
    /// String of the `FT.CREATE` command used to create the index template.
901
    const CREATE_INDEX_TEMPLATE: &str = "FT.CREATE {} ON HASH PREFIX 1 {} NOOFFSETS NOHL NOFIELDS NOFREQS SCHEMA {} TAG CASESENSITIVE SORTABLE";
902
903
    /// Compile-time fingerprint of the `FT.CREATE` command used to create the
904
    /// index template. This is a simple CRC32 checksum of the command string.
905
    /// We don't care about it actually being a valid CRC32 checksum, just that
906
    /// it's a unique identifier with a low chance of collision.
907
1
    const fn fingerprint_create_index_template() -> u32 {
908
        const POLY: u32 = 0xEDB8_8320;
909
        const DATA: &[u8] = CREATE_INDEX_TEMPLATE.as_bytes();
910
1
        let mut crc = 0xFFFF_FFFF;
911
1
        let mut i = 0;
912
102
        while i < DATA.len() {
  Branch (912:15): [True: 101, False: 1]
913
101
            let byte = DATA[i];
914
101
            crc ^= byte as u32;
915
916
101
            let mut j = 0;
917
909
            while j < 8 {
  Branch (917:19): [True: 808, False: 101]
918
808
                crc = if crc & 1 != 0 {
  Branch (918:26): [True: 386, False: 422]
919
386
                    (crc >> 1) ^ POLY
920
                } else {
921
422
                    crc >> 1
922
                };
923
808
                j += 1;
924
            }
925
101
            i += 1;
926
        }
927
1
        crc
928
1
    }
929
930
    /// Verify that our calculation always evaluates to this fixed value.
931
    #[test]
932
1
    fn test_fingerprint_value() {
933
1
        assert_eq!(
934
1
            format!("{:08x}", &fingerprint_create_index_template()),
935
            FINGERPRINT_CREATE_INDEX_HEX,
936
        );
937
1
    }
938
}
939
940
/// Get the name of the index to create for the given field.
941
/// This will add some prefix data to the name to try and ensure
942
/// if the index definition changes, the name will get a new name.
943
macro_rules! get_index_name {
944
    ($prefix:expr, $field:expr, $maybe_sort:expr) => {
945
        format_args!(
946
            "{}_{}_{}_{}",
947
            $prefix,
948
            $field,
949
            $maybe_sort.unwrap_or(""),
950
            FINGERPRINT_CREATE_INDEX_HEX
951
        )
952
    };
953
}
954
955
/// Try to sanitize a string to be used as a Redis key.
956
/// We don't actually modify the string, just check if it's valid.
957
20
const fn try_sanitize(s: &str) -> Option<&str> {
958
    // Note: We cannot use for loops or iterators here because they are not const.
959
    // Allowing us to use a const function here gives the compiler the ability to
960
    // optimize this function away entirely in the case where the input is constant.
961
20
    let chars = s.as_bytes();
962
20
    let mut i: usize = 0;
963
20
    let len = s.len();
964
    loop {
965
389
        if i >= len {
  Branch (965:12): [True: 20, False: 369]
  Branch (965:12): [Folded - Ignored]
966
20
            break;
967
369
        }
968
369
        let c = chars[i];
969
369
        if !c.is_ascii_alphanumeric() && 
c != b'_'15
{
  Branch (969:12): [True: 15, False: 354]
  Branch (969:42): [True: 0, False: 15]
  Branch (969:12): [Folded - Ignored]
  Branch (969:42): [Folded - Ignored]
970
0
            return None;
971
369
        }
972
369
        i += 1;
973
    }
974
20
    Some(s)
975
20
}
976
977
/// An individual subscription to a key in Redis.
978
#[derive(Debug)]
979
pub struct RedisSubscription {
980
    receiver: Option<tokio::sync::watch::Receiver<String>>,
981
    weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
982
}
983
984
impl SchedulerSubscription for RedisSubscription {
985
    /// Wait for the subscription key to change.
986
18
    async fn changed(&mut self) -> Result<(), Error> {
987
18
        let receiver = self
988
18
            .receiver
989
18
            .as_mut()
990
18
            .ok_or_else(|| make_err!(
Code::Internal0
, "In RedisSubscription::changed::as_mut"))
?0
;
991
18
        receiver
992
18
            .changed()
993
18
            .await
994
15
            .map_err(|_| make_err!(
Code::Internal0
, "In RedisSubscription::changed::changed"))
995
15
    }
996
}
997
998
// If the subscription is dropped, we need to possibly remove the key from the
999
// subscribed keys map.
1000
impl Drop for RedisSubscription {
1001
4
    fn drop(&mut self) {
1002
4
        let Some(receiver) = self.receiver.take() else {
  Branch (1002:13): [True: 4, False: 0]
  Branch (1002:13): [Folded - Ignored]
1003
0
            warn!("RedisSubscription has already been dropped, nothing to do.");
1004
0
            return; // Already dropped, nothing to do.
1005
        };
1006
4
        let key = receiver.borrow().clone();
1007
        // IMPORTANT: This must be dropped before receiver_count() is called.
1008
4
        drop(receiver);
1009
4
        let Some(
subscribed_keys2
) = self.weak_subscribed_keys.upgrade() else {
  Branch (1009:13): [True: 2, False: 2]
  Branch (1009:13): [Folded - Ignored]
1010
2
            return; // Already dropped, nothing to do.
1011
        };
1012
2
        let mut subscribed_keys = subscribed_keys.write();
1013
2
        let Some(value) = subscribed_keys.get(&key) else {
  Branch (1013:13): [True: 2, False: 0]
  Branch (1013:13): [Folded - Ignored]
1014
0
            error!(
1015
0
                "Key {key} was not found in subscribed keys when checking if it should be removed."
1016
            );
1017
0
            return;
1018
        };
1019
        // If we have no receivers, cleanup the entry from our map.
1020
2
        if value.receiver_count() == 0 {
  Branch (1020:12): [True: 2, False: 0]
  Branch (1020:12): [Folded - Ignored]
1021
2
            subscribed_keys.remove(key);
1022
2
        
}0
1023
4
    }
1024
}
1025
1026
/// A publisher for a key in Redis.
1027
#[derive(Debug)]
1028
struct RedisSubscriptionPublisher {
1029
    sender: Mutex<tokio::sync::watch::Sender<String>>,
1030
}
1031
1032
impl RedisSubscriptionPublisher {
1033
4
    fn new(
1034
4
        key: String,
1035
4
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
1036
4
    ) -> (Self, RedisSubscription) {
1037
4
        let (sender, receiver) = tokio::sync::watch::channel(key);
1038
4
        let publisher = Self {
1039
4
            sender: Mutex::new(sender),
1040
4
        };
1041
4
        let subscription = RedisSubscription {
1042
4
            receiver: Some(receiver),
1043
4
            weak_subscribed_keys,
1044
4
        };
1045
4
        (publisher, subscription)
1046
4
    }
1047
1048
0
    fn subscribe(
1049
0
        &self,
1050
0
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
1051
0
    ) -> RedisSubscription {
1052
0
        let receiver = self.sender.lock().subscribe();
1053
0
        RedisSubscription {
1054
0
            receiver: Some(receiver),
1055
0
            weak_subscribed_keys,
1056
0
        }
1057
0
    }
1058
1059
2
    fn receiver_count(&self) -> usize {
1060
2
        self.sender.lock().receiver_count()
1061
2
    }
1062
1063
18
    fn notify(&self) {
1064
        // TODO(https://github.com/sile/patricia_tree/issues/40) When this is addressed
1065
        // we can remove the `Mutex` and use the mutable iterator directly.
1066
18
        self.sender.lock().send_modify(|_| {});
1067
18
    }
1068
}
1069
1070
#[derive(Debug)]
1071
pub struct RedisSubscriptionManager {
1072
    subscribed_keys: Arc<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
1073
    tx_for_test: tokio::sync::mpsc::UnboundedSender<String>,
1074
    _subscription_spawn: JoinHandleDropGuard<()>,
1075
}
1076
1077
impl RedisSubscriptionManager {
1078
3
    pub fn new(subscribe_client: SubscriberClient, pub_sub_channel: String) -> Self {
1079
3
        let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new()));
1080
3
        let subscribed_keys_weak = Arc::downgrade(&subscribed_keys);
1081
3
        let (tx_for_test, mut rx_for_test) = tokio::sync::mpsc::unbounded_channel();
1082
        Self {
1083
3
            subscribed_keys,
1084
3
            tx_for_test,
1085
3
            _subscription_spawn: spawn!("redis_subscribe_spawn", async move {
1086
3
                let mut rx = subscribe_client.message_rx();
1087
                loop {
1088
3
                    if let Err(
e0
) = subscribe_client.subscribe(&pub_sub_channel).await {
  Branch (1088:28): [True: 0, False: 3]
  Branch (1088:28): [Folded - Ignored]
1089
0
                        error!("Error subscribing to pattern - {e}");
1090
0
                        return;
1091
3
                    }
1092
3
                    let mut reconnect_rx = subscribe_client.reconnect_rx();
1093
3
                    let reconnect_fut = reconnect_rx.recv().fuse();
1094
3
                    tokio::pin!(reconnect_fut);
1095
                    loop {
1096
17
                        let 
key14
= select! {
1097
17
                            
value14
= rx_for_test.recv() => {
1098
14
                                let Some(value) = value else {
  Branch (1098:37): [True: 14, False: 0]
  Branch (1098:37): [Folded - Ignored]
1099
0
                                    unreachable!("Channel should never close");
1100
                                };
1101
14
                                value.into()
1102
                            },
1103
17
                            
msg0
= rx.recv() => {
1104
0
                                match msg {
1105
0
                                    Ok(msg) => {
1106
0
                                        if let RedisValue::String(s) = msg.value {
  Branch (1106:48): [True: 0, False: 0]
  Branch (1106:48): [Folded - Ignored]
1107
0
                                            s
1108
                                        } else {
1109
0
                                            error!("Received non-string message in RedisSubscriptionManager");
1110
0
                                            continue;
1111
                                        }
1112
                                    },
1113
0
                                    Err(e) => {
1114
                                        // Check to see if our parent has been dropped and if so kill spawn.
1115
0
                                        if subscribed_keys_weak.upgrade().is_none() {
  Branch (1115:44): [True: 0, False: 0]
  Branch (1115:44): [Folded - Ignored]
1116
0
                                            warn!("It appears our parent has been dropped, exiting RedisSubscriptionManager spawn");
1117
0
                                            return;
1118
0
                                        }
1119
0
                                        error!("Error receiving message in RedisSubscriptionManager reconnecting and flagging everything changed - {e}");
1120
0
                                        break;
1121
                                    }
1122
                                }
1123
                            },
1124
17
                            _ = &mut reconnect_fut => {
1125
0
                                warn!("Redis reconnected flagging all subscriptions as changed and resuming");
1126
0
                                break;
1127
                            }
1128
                        };
1129
14
                        let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
  Branch (1129:29): [True: 14, False: 0]
  Branch (1129:29): [Folded - Ignored]
1130
0
                            warn!(
1131
0
                                "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"
1132
                            );
1133
0
                            return;
1134
                        };
1135
14
                        let subscribed_keys_mux = subscribed_keys.read();
1136
14
                        subscribed_keys_mux
1137
14
                            .common_prefix_values(&*key)
1138
14
                            .for_each(RedisSubscriptionPublisher::notify);
1139
                    }
1140
                    // Sleep for a small amount of time to ensure we don't reconnect too quickly.
1141
0
                    sleep(Duration::from_secs(1)).await;
1142
                    // If we reconnect or lag behind we might have had dirty keys, so we need to
1143
                    // flag all of them as changed.
1144
0
                    let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
  Branch (1144:25): [True: 0, False: 0]
  Branch (1144:25): [Folded - Ignored]
1145
0
                        warn!(
1146
0
                            "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"
1147
                        );
1148
0
                        return;
1149
                    };
1150
0
                    let subscribed_keys_mux = subscribed_keys.read();
1151
                    // Just in case also get a new receiver.
1152
0
                    rx = subscribe_client.message_rx();
1153
                    // Drop all buffered messages, then flag everything as changed.
1154
0
                    rx.resubscribe();
1155
0
                    for publisher in subscribed_keys_mux.values() {
1156
0
                        publisher.notify();
1157
0
                    }
1158
                }
1159
0
            }),
1160
        }
1161
3
    }
1162
}
1163
1164
impl SchedulerSubscriptionManager for RedisSubscriptionManager {
1165
    type Subscription = RedisSubscription;
1166
1167
14
    fn notify_for_test(&self, value: String) {
1168
14
        self.tx_for_test.send(value).unwrap();
1169
14
    }
1170
1171
4
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
1172
4
    where
1173
4
        K: SchedulerStoreKeyProvider,
1174
    {
1175
4
        let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys);
1176
4
        let mut subscribed_keys = self.subscribed_keys.write();
1177
4
        let key = key.get_key();
1178
4
        let key_str = key.as_str();
1179
4
        let mut subscription = if let Some(
publisher0
) = subscribed_keys.get(&key_str) {
  Branch (1179:39): [True: 0, False: 0]
  Branch (1179:39): [Folded - Ignored]
  Branch (1179:39): [Folded - Ignored]
  Branch (1179:39): [True: 0, False: 4]
1180
0
            publisher.subscribe(weak_subscribed_keys)
1181
        } else {
1182
4
            let (publisher, subscription) =
1183
4
                RedisSubscriptionPublisher::new(key_str.to_string(), weak_subscribed_keys);
1184
4
            subscribed_keys.insert(key_str, publisher);
1185
4
            subscription
1186
        };
1187
4
        subscription
1188
4
            .receiver
1189
4
            .as_mut()
1190
4
            .ok_or_else(|| 
{0
1191
0
                make_err!(
1192
0
                    Code::Internal,
1193
                    "Receiver should be set in RedisSubscriptionManager::subscribe"
1194
                )
1195
0
            })?
1196
4
            .mark_changed();
1197
1198
4
        Ok(subscription)
1199
4
    }
1200
1201
1
    fn is_reliable() -> bool {
1202
1
        false
1203
1
    }
1204
}
1205
1206
impl SchedulerStore for RedisStore {
1207
    type SubscriptionManager = RedisSubscriptionManager;
1208
1209
6
    fn subscription_manager(&self) -> Result<Arc<RedisSubscriptionManager>, Error> {
1210
6
        let mut subscription_manager = self.subscription_manager.lock();
1211
1212
6
        if let Some(
subscription_manager3
) = &*subscription_manager {
  Branch (1212:16): [True: 3, False: 3]
  Branch (1212:16): [Folded - Ignored]
1213
3
            Ok(subscription_manager.clone())
1214
        } else {
1215
3
            let Some(pub_sub_channel) = &self.pub_sub_channel else {
  Branch (1215:17): [True: 3, False: 0]
  Branch (1215:17): [Folded - Ignored]
1216
0
                return Err(make_input_err!(
1217
0
                    "RedisStore must have a pubsub channel for a Redis Scheduler if using subscriptions"
1218
0
                ));
1219
            };
1220
3
            let sub = Arc::new(RedisSubscriptionManager::new(
1221
3
                self.subscriber_client.clone(),
1222
3
                pub_sub_channel.clone(),
1223
            ));
1224
3
            *subscription_manager = Some(sub.clone());
1225
3
            Ok(sub)
1226
        }
1227
6
    }
1228
1229
16
    async fn update_data<T>(&self, data: T) -> Result<Option<i64>, Error>
1230
16
    where
1231
16
        T: SchedulerStoreDataProvider
1232
16
            + SchedulerStoreKeyProvider
1233
16
            + SchedulerCurrentVersionProvider
1234
16
            + Send,
1235
16
    {
1236
16
        let key = data.get_key();
1237
16
        let redis_key = self.encode_key(&key);
1238
16
        let client = self.get_client().await
?0
;
1239
16
        let maybe_index = data.get_indexes().err_tip(|| 
{0
1240
0
            format!("Err getting index in RedisStore::update_data::versioned for {redis_key}")
1241
0
        })?;
1242
16
        if <T as SchedulerStoreKeyProvider>::Versioned::VALUE {
  Branch (1242:12): [True: 0, False: 0]
  Branch (1242:12): [True: 0, False: 0]
  Branch (1242:12): [True: 0, False: 0]
  Branch (1242:12): [Folded - Ignored]
  Branch (1242:12): [Folded - Ignored]
  Branch (1242:12): [True: 0, False: 3]
  Branch (1242:12): [True: 0, False: 0]
  Branch (1242:12): [True: 13, False: 0]
1243
13
            let current_version = data.current_version();
1244
13
            let data = data.try_into_bytes().err_tip(|| 
{0
1245
0
                format!("Could not convert value to bytes in RedisStore::update_data::versioned for {redis_key}")
1246
0
            })?;
1247
13
            let mut argv = Vec::with_capacity(3 + maybe_index.len() * 2);
1248
13
            argv.push(Bytes::from(format!("{current_version}")));
1249
13
            argv.push(data);
1250
52
            for (
name39
,
value39
) in maybe_index {
1251
39
                argv.push(Bytes::from_static(name.as_bytes()));
1252
39
                argv.push(value);
1253
39
            }
1254
13
            let start = std::time::Instant::now();
1255
1256
13
            let (success, new_version): (bool, i64) = self
1257
13
                .update_if_version_matches_script
1258
13
                .evalsha_with_reload(&client.client, vec![redis_key.as_ref()], argv)
1259
13
                .await
1260
13
                .err_tip(|| format!(
"In RedisStore::update_data::versioned for {key:?}"0
))
?0
;
1261
1262
13
            let elapsed = start.elapsed();
1263
1264
13
            if elapsed > Duration::from_millis(100) {
  Branch (1264:16): [True: 0, False: 0]
  Branch (1264:16): [True: 0, False: 0]
  Branch (1264:16): [True: 0, False: 0]
  Branch (1264:16): [Folded - Ignored]
  Branch (1264:16): [Folded - Ignored]
  Branch (1264:16): [True: 0, False: 0]
  Branch (1264:16): [True: 0, False: 0]
  Branch (1264:16): [True: 0, False: 13]
1265
0
                warn!(
1266
                    %redis_key,
1267
                    ?elapsed,
1268
0
                    "Slow Redis version-set operation"
1269
                );
1270
13
            }
1271
13
            if !success {
  Branch (1271:16): [True: 0, False: 0]
  Branch (1271:16): [True: 0, False: 0]
  Branch (1271:16): [True: 0, False: 0]
  Branch (1271:16): [Folded - Ignored]
  Branch (1271:16): [Folded - Ignored]
  Branch (1271:16): [True: 0, False: 0]
  Branch (1271:16): [True: 0, False: 0]
  Branch (1271:16): [True: 1, False: 12]
1272
1
                warn!(
1273
                    %redis_key,
1274
                    %key,
1275
                    %current_version,
1276
                    %new_version,
1277
1
                    caller = core::any::type_name::<T>(),
1278
1
                    "Redis version conflict - optimistic lock failed"
1279
                );
1280
1
                return Ok(None);
1281
12
            }
1282
12
            trace!(
1283
                %redis_key,
1284
                %key,
1285
                old_version = %current_version,
1286
                %new_version,
1287
12
                "Updated redis key to new version"
1288
            );
1289
            // If we have a publish channel configured, send a notice that the key has been set.
1290
12
            if let Some(pub_sub_channel) = &self.pub_sub_channel {
  Branch (1290:20): [True: 0, False: 0]
  Branch (1290:20): [True: 0, False: 0]
  Branch (1290:20): [True: 0, False: 0]
  Branch (1290:20): [Folded - Ignored]
  Branch (1290:20): [Folded - Ignored]
  Branch (1290:20): [True: 0, False: 0]
  Branch (1290:20): [True: 0, False: 0]
  Branch (1290:20): [True: 12, False: 0]
1291
12
                return Ok(client
1292
12
                    .client
1293
12
                    .publish(pub_sub_channel, redis_key.as_ref())
1294
12
                    .await
?0
);
1295
0
            }
1296
0
            Ok(Some(new_version))
1297
        } else {
1298
3
            let data = data.try_into_bytes().err_tip(|| 
{0
1299
0
                format!("Could not convert value to bytes in RedisStore::update_data::noversion for {redis_key}")
1300
0
            })?;
1301
3
            let mut fields = RedisMap::new();
1302
3
            fields.reserve(1 + maybe_index.len());
1303
3
            fields.insert(DATA_FIELD_NAME.into(), data.into());
1304
3
            for (
name0
,
value0
) in maybe_index {
1305
0
                fields.insert(name.into(), value.into());
1306
0
            }
1307
3
            client
1308
3
                .client
1309
3
                .hset::<(), _, _>(redis_key.as_ref(), fields)
1310
3
                .await
1311
3
                .err_tip(|| format!(
"In RedisStore::update_data::noversion for {redis_key}"0
))
?0
;
1312
            // If we have a publish channel configured, send a notice that the key has been set.
1313
3
            if let Some(pub_sub_channel) = &self.pub_sub_channel {
  Branch (1313:20): [True: 0, False: 0]
  Branch (1313:20): [True: 0, False: 0]
  Branch (1313:20): [True: 0, False: 0]
  Branch (1313:20): [Folded - Ignored]
  Branch (1313:20): [Folded - Ignored]
  Branch (1313:20): [True: 3, False: 0]
  Branch (1313:20): [True: 0, False: 0]
  Branch (1313:20): [True: 0, False: 0]
1314
3
                return Ok(client
1315
3
                    .client
1316
3
                    .publish(pub_sub_channel, redis_key.as_ref())
1317
3
                    .await
?0
);
1318
0
            }
1319
0
            Ok(Some(0)) // Always use "0" version since this is not a versioned request.
1320
        }
1321
16
    }
1322
1323
20
    async fn search_by_index_prefix<K>(
1324
20
        &self,
1325
20
        index: K,
1326
20
    ) -> Result<
1327
20
        impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
1328
20
        Error,
1329
20
    >
1330
20
    where
1331
20
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,
1332
20
    {
1333
20
        let index_value = index.index_value();
1334
20
        let sanitized_field = try_sanitize(index_value.as_ref())
1335
20
            .err_tip(|| 
{0
1336
0
                format!("In RedisStore::search_by_index_prefix::try_sanitize - {index_value:?}")
1337
0
            })?
1338
20
            .to_string();
1339
20
        let index_name = format!(
1340
20
            "{}",
1341
20
            get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1342
        );
1343
1344
20
        let run_ft_aggregate = |client: Arc<ClientWithPermit>,
1345
                                index_name: String,
1346
20
                                sanitized_field: String| async move {
1347
20
            ft_aggregate(
1348
20
                client.client.clone(),
1349
20
                index_name,
1350
20
                if sanitized_field.is_empty() {
  Branch (1350:20): [True: 0, False: 0]
  Branch (1350:20): [True: 0, False: 0]
  Branch (1350:20): [Folded - Ignored]
  Branch (1350:20): [Folded - Ignored]
  Branch (1350:20): [True: 0, False: 17]
  Branch (1350:20): [True: 0, False: 3]
1351
0
                    "*".to_string()
1352
                } else {
1353
20
                    format!("@{}:{{ {} }}", K::INDEX_NAME, sanitized_field)
1354
                },
1355
                FtAggregateOptions {
1356
20
                    load: Some(Load::Some(vec![
1357
20
                        SearchField {
1358
20
                            identifier: DATA_FIELD_NAME.into(),
1359
20
                            property: None,
1360
20
                        },
1361
20
                        SearchField {
1362
20
                            identifier: VERSION_FIELD_NAME.into(),
1363
20
                            property: None,
1364
20
                        },
1365
20
                    ])),
1366
20
                    cursor: Some(WithCursor {
1367
20
                        count: Some(MAX_COUNT_PER_CURSOR),
1368
20
                        max_idle: Some(CURSOR_IDLE_MS),
1369
20
                    }),
1370
20
                    pipeline: vec![AggregateOperation::SortBy {
1371
20
                        properties: K::MAYBE_SORT_KEY.map_or_else(Vec::new, |v| 
{17
1372
17
                            vec![(format!("@{v}").into(), SortOrder::Asc)]
1373
17
                        }),
1374
20
                        max: None,
1375
                    }],
1376
20
                    ..Default::default()
1377
                },
1378
            )
1379
20
            .await
1380
20
            .map(|stream| (stream, client))
1381
40
        };
1382
1383
20
        let client = Arc::new(self.get_client().await
?0
);
1384
20
        let (stream, client_guard) = if let Ok(result) =
  Branch (1384:45): [True: 0, False: 0]
  Branch (1384:45): [True: 0, False: 0]
  Branch (1384:45): [Folded - Ignored]
  Branch (1384:45): [Folded - Ignored]
  Branch (1384:45): [True: 17, False: 0]
  Branch (1384:45): [True: 3, False: 0]
1385
20
            run_ft_aggregate(client.clone(), index_name.clone(), sanitized_field.clone()).await
1386
        {
1387
20
            result
1388
        } else {
1389
0
            let mut schema = vec![SearchSchema {
1390
0
                field_name: K::INDEX_NAME.into(),
1391
0
                alias: None,
1392
0
                kind: SearchSchemaKind::Tag {
1393
0
                    sortable: false,
1394
0
                    unf: false,
1395
0
                    separator: None,
1396
0
                    casesensitive: false,
1397
0
                    withsuffixtrie: false,
1398
0
                    noindex: false,
1399
0
                },
1400
0
            }];
1401
0
            if let Some(sort_key) = K::MAYBE_SORT_KEY {
  Branch (1401:20): [True: 0, False: 0]
  Branch (1401:20): [True: 0, False: 0]
  Branch (1401:20): [Folded - Ignored]
  Branch (1401:20): [Folded - Ignored]
  Branch (1401:20): [True: 0, False: 0]
  Branch (1401:20): [True: 0, False: 0]
1402
0
                schema.push(SearchSchema {
1403
0
                    field_name: sort_key.into(),
1404
0
                    alias: None,
1405
0
                    kind: SearchSchemaKind::Tag {
1406
0
                        sortable: true,
1407
0
                        unf: false,
1408
0
                        separator: None,
1409
0
                        casesensitive: false,
1410
0
                        withsuffixtrie: false,
1411
0
                        noindex: false,
1412
0
                    },
1413
0
                });
1414
0
            }
1415
0
            let create_result: Result<(), Error> = {
1416
0
                let create_client = self.get_client().await?;
1417
0
                create_client
1418
0
                    .client
1419
0
                    .ft_create::<(), _>(
1420
0
                        index_name.clone(),
1421
0
                        FtCreateOptions {
1422
0
                            on: Some(IndexKind::Hash),
1423
0
                            prefixes: vec![K::KEY_PREFIX.into()],
1424
0
                            nohl: true,
1425
0
                            nofields: true,
1426
0
                            nofreqs: true,
1427
0
                            nooffsets: true,
1428
0
                            temporary: Some(INDEX_TTL_S),
1429
0
                            ..Default::default()
1430
0
                        },
1431
0
                        schema,
1432
0
                    )
1433
0
                    .await
1434
0
                    .err_tip(|| {
1435
0
                        format!(
1436
0
                            "Error with ft_create in RedisStore::search_by_index_prefix({})",
1437
0
                            get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1438
                        )
1439
0
                    })?;
1440
0
                Ok(())
1441
            };
1442
0
            let retry_client = Arc::new(self.get_client().await?);
1443
0
            let retry_result =
1444
0
                run_ft_aggregate(retry_client, index_name.clone(), sanitized_field.clone()).await;
1445
0
            if let Ok(result) = retry_result {
  Branch (1445:20): [True: 0, False: 0]
  Branch (1445:20): [True: 0, False: 0]
  Branch (1445:20): [Folded - Ignored]
  Branch (1445:20): [Folded - Ignored]
  Branch (1445:20): [True: 0, False: 0]
  Branch (1445:20): [True: 0, False: 0]
1446
0
                result
1447
            } else {
1448
0
                let e: Error = retry_result
1449
0
                    .err()
1450
0
                    .expect("Checked for Ok result above")
1451
0
                    .into();
1452
0
                let err = match create_result {
1453
0
                    Ok(()) => e,
1454
0
                    Err(create_err) => create_err.merge(e),
1455
                };
1456
0
                return Err(err);
1457
            }
1458
        };
1459
1460
20
        Ok(stream.map(move |result| 
{7
1461
7
            let keep_alive = client_guard.clone();
1462
7
            let _ = &keep_alive;
1463
7
            let mut redis_map =
1464
7
                result.err_tip(|| "Error in stream of in RedisStore::search_by_index_prefix")
?0
;
1465
7
            let bytes_data = redis_map
1466
7
                .remove(&RedisKey::from_static_str(DATA_FIELD_NAME))
1467
7
                .err_tip(|| "Missing data field in RedisStore::search_by_index_prefix")
?0
1468
7
                .into_bytes()
1469
7
                .err_tip(|| 
{0
1470
0
                    formatcp!("'{DATA_FIELD_NAME}' is not Bytes in RedisStore::search_by_index_prefix::into_bytes")
1471
0
                })?;
1472
7
            let version = if <K as SchedulerIndexProvider>::Versioned::VALUE {
  Branch (1472:30): [True: 0, False: 0]
  Branch (1472:30): [True: 0, False: 0]
  Branch (1472:30): [Folded - Ignored]
  Branch (1472:30): [Folded - Ignored]
  Branch (1472:30): [True: 5, False: 0]
  Branch (1472:30): [True: 2, False: 0]
1473
7
                redis_map
1474
7
                    .remove(&RedisKey::from_static_str(VERSION_FIELD_NAME))
1475
7
                    .err_tip(|| "Missing version field in RedisStore::search_by_index_prefix")
?0
1476
7
                    .as_i64()
1477
7
                    .err_tip(|| 
{0
1478
0
                        formatcp!("'{VERSION_FIELD_NAME}' is not u64 in RedisStore::search_by_index_prefix::as_u64")
1479
0
                    })?
1480
            } else {
1481
0
                0
1482
            };
1483
7
            K::decode(version, bytes_data)
1484
7
                .err_tip(|| "In RedisStore::search_by_index_prefix::decode")
1485
7
        }))
1486
20
    }
1487
1488
60
    async fn get_and_decode<K>(
1489
60
        &self,
1490
60
        key: K,
1491
60
    ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>
1492
60
    where
1493
60
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send,
1494
60
    {
1495
60
        let key = key.get_key();
1496
60
        let key = self.encode_key(&key);
1497
60
        let client = self.get_client().await
?0
;
1498
60
        let (maybe_version, maybe_data) = client
1499
60
            .client
1500
60
            .hmget::<(Option<i64>, Option<Bytes>), _, _>(
1501
60
                key.as_ref(),
1502
60
                vec![
1503
60
                    RedisKey::from(VERSION_FIELD_NAME),
1504
60
                    RedisKey::from(DATA_FIELD_NAME),
1505
60
                ],
1506
60
            )
1507
60
            .await
1508
60
            .err_tip(|| format!(
"In RedisStore::get_without_version::notversioned {key}"0
))
?0
;
1509
60
        let Some(
data31
) = maybe_data else {
  Branch (1509:13): [True: 0, False: 0]
  Branch (1509:13): [True: 0, False: 0]
  Branch (1509:13): [True: 0, False: 0]
  Branch (1509:13): [Folded - Ignored]
  Branch (1509:13): [Folded - Ignored]
  Branch (1509:13): [True: 2, False: 0]
  Branch (1509:13): [True: 0, False: 28]
  Branch (1509:13): [True: 29, False: 1]
1510
29
            return Ok(None);
1511
        };
1512
31
        Ok(Some(K::decode(maybe_version.unwrap_or(0), data).err_tip(
1513
0
            || format!("In RedisStore::get_with_version::notversioned::decode {key}"),
1514
0
        )?))
1515
60
    }
1516
}