Coverage Report

Created: 2024-11-20 10:13

/build/source/nativelink-store/src/redis_store.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::cmp;
17
use std::pin::Pin;
18
use std::sync::{Arc, Weak};
19
use std::time::Duration;
20
21
use async_trait::async_trait;
22
use bytes::Bytes;
23
use const_format::formatcp;
24
use fred::clients::{RedisPool, SubscriberClient};
25
use fred::interfaces::{ClientLike, KeysInterface, PubsubInterface};
26
use fred::prelude::{EventInterface, HashesInterface, RediSearchInterface};
27
use fred::types::{
28
    Builder, ConnectionConfig, FtCreateOptions, PerformanceConfig, ReconnectPolicy, RedisConfig,
29
    RedisKey, RedisMap, RedisValue, Script, SearchSchema, SearchSchemaKind, UnresponsiveConfig,
30
};
31
use futures::stream::FuturesUnordered;
32
use futures::{future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
33
use nativelink_config::stores::{RedisMode, RedisSpec};
34
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
35
use nativelink_metric::MetricsComponent;
36
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
37
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
38
use nativelink_util::spawn;
39
use nativelink_util::store_trait::{
40
    BoolValue, SchedulerCurrentVersionProvider, SchedulerIndexProvider, SchedulerStore,
41
    SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,
42
    SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo,
43
};
44
use nativelink_util::task::JoinHandleDropGuard;
45
use parking_lot::{Mutex, RwLock};
46
use patricia_tree::StringPatriciaMap;
47
use tokio::select;
48
use tokio::time::sleep;
49
use tracing::{event, Level};
50
use uuid::Uuid;
51
52
use crate::cas_utils::is_zero_digest;
53
use crate::redis_utils::ft_aggregate;
54
55
/// The default size of the read chunk when reading data from Redis.
56
/// Note: If this changes it should be updated in the config documentation.
57
const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024;
58
59
/// The default size of the connection pool if not specified.
60
/// Note: If this changes it should be updated in the config documentation.
61
const DEFAULT_CONNECTION_POOL_SIZE: usize = 3;
62
63
/// The default delay between retries if not specified.
64
/// Note: If this changes it should be updated in the config documentation.
65
const DEFAULT_RETRY_DELAY: f32 = 0.1;
66
/// The amount of jitter to add to the retry delay if not specified.
67
/// Note: If this changes it should be updated in the config documentation.
68
const DEFAULT_RETRY_JITTER: f32 = 0.5;
69
70
/// The default maximum capacity of the broadcast channel if not specified.
71
/// Note: If this changes it should be updated in the config documentation.
72
const DEFAULT_BROADCAST_CHANNEL_CAPACITY: usize = 4096;
73
74
/// The default connection timeout in milliseconds if not specified.
75
/// Note: If this changes it should be updated in the config documentation.
76
const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000;
77
78
/// The default command timeout in milliseconds if not specified.
79
/// Note: If this changes it should be updated in the config documentation.
80
const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000;
81
82
/// The default maximum number of chunk uploads per update.
83
/// Note: If this changes it should be updated in the config documentation.
84
const DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE: usize = 10;
85
86
#[allow(clippy::trivially_copy_pass_by_ref)]
87
1
fn to_hex(value: &u32) -> String {
88
1
    format!("{value:08x}")
89
1
}
90
91
/// A [`StoreDriver`] implementation that uses Redis as a backing store.
92
1
#[derive(MetricsComponent)]
93
pub struct RedisStore {
94
    /// The client pool connecting to the backing Redis instance(s).
95
    client_pool: RedisPool,
96
97
    /// A channel to publish updates to when a key is added, removed, or modified.
98
    #[metric(
99
        help = "The pubsub channel to publish updates to when a key is added, removed, or modified"
100
    )]
101
    pub_sub_channel: Option<String>,
102
103
    /// A redis client for managing subscriptions.
104
    /// TODO: This should be moved into the store in followups once a standard use pattern has been determined.
105
    subscriber_client: SubscriberClient,
106
107
    /// For metrics only.
108
    #[metric(
109
        help = "A unique identifier for the FT.CREATE command used to create the index template",
110
        handler = to_hex
111
    )]
112
    fingerprint_create_index: u32,
113
114
    /// A function used to generate names for temporary keys.
115
    temp_name_generator_fn: fn() -> String,
116
117
    /// A common prefix to append to all keys before they are sent to Redis.
118
    ///
119
    /// See [`RedisStore::key_prefix`](`nativelink_config::stores::RedisStore::key_prefix`).
120
    #[metric(help = "Prefix to append to all keys before sending to Redis")]
121
    key_prefix: String,
122
123
    /// The amount of data to read from Redis at a time.
124
    #[metric(help = "The amount of data to read from Redis at a time")]
125
    read_chunk_size: usize,
126
127
    /// The maximum number of chunk uploads per update.
128
    /// This is used to limit the number of chunk uploads per update to prevent
129
    #[metric(help = "The maximum number of chunk uploads per update")]
130
    max_chunk_uploads_per_update: usize,
131
132
    /// Redis script used to update a value in redis if the version matches.
133
    /// This is done by incrementing the version number and then setting the new data
134
    /// only if the version number matches the existing version number.
135
    update_if_version_matches_script: Script,
136
137
    /// A manager for subscriptions to keys in Redis.
138
    subscription_manager: Mutex<Option<Arc<RedisSubscriptionManager>>>,
139
}
140
141
impl RedisStore {
142
    /// Create a new `RedisStore` from the given configuration.
143
0
    pub fn new(mut spec: RedisSpec) -> Result<Arc<Self>, Error> {
144
0
        if spec.addresses.is_empty() {
  Branch (144:12): [True: 0, False: 0]
  Branch (144:12): [Folded - Ignored]
145
0
            return Err(make_err!(
146
0
                Code::InvalidArgument,
147
0
                "No addresses were specified in redis store configuration."
148
0
            ));
149
0
        };
150
0
        let [addr] = spec.addresses.as_slice() else {
  Branch (150:13): [True: 0, False: 0]
  Branch (150:13): [Folded - Ignored]
151
0
            return Err(make_err!(Code::Unimplemented, "Connecting directly to multiple redis nodes in a cluster is currently unsupported. Please specify a single URL to a single node, and nativelink will use cluster discover to find the other nodes."));
152
        };
153
0
        let redis_config = match spec.mode {
154
0
            RedisMode::Cluster => RedisConfig::from_url_clustered(addr),
155
0
            RedisMode::Sentinel => RedisConfig::from_url_sentinel(addr),
156
0
            RedisMode::Standard => RedisConfig::from_url_centralized(addr),
157
        }
158
0
        .err_tip_with_code(|e| {
159
0
            (
160
0
                Code::InvalidArgument,
161
0
                format!("while parsing redis node address: {e}"),
162
0
            )
163
0
        })?;
164
165
0
        let reconnect_policy = {
166
0
            if spec.retry.delay == 0.0 {
  Branch (166:16): [True: 0, False: 0]
  Branch (166:16): [Folded - Ignored]
167
0
                spec.retry.delay = DEFAULT_RETRY_DELAY;
168
0
            }
169
0
            if spec.retry.jitter == 0.0 {
  Branch (169:16): [True: 0, False: 0]
  Branch (169:16): [Folded - Ignored]
170
0
                spec.retry.jitter = DEFAULT_RETRY_JITTER;
171
0
            }
172
173
0
            let max_retries = u32::try_from(spec.retry.max_retries)
174
0
                .err_tip(|| "max_retries could not be converted to u32 in RedisStore::new")?;
175
0
            let min_delay_ms = (spec.retry.delay * 1000.0) as u32;
176
0
            let max_delay_ms = 8000;
177
0
            let jitter = (spec.retry.jitter * spec.retry.delay * 1000.0) as u32;
178
0
179
0
            let mut reconnect_policy = ReconnectPolicy::new_exponential(
180
0
                max_retries,  /* max_retries, 0 is unlimited */
181
0
                min_delay_ms, /* min_delay */
182
0
                max_delay_ms, /* max_delay */
183
0
                2,            /* mult */
184
0
            );
185
0
            reconnect_policy.set_jitter(jitter);
186
0
            reconnect_policy
187
0
        };
188
0
189
0
        {
190
0
            if spec.broadcast_channel_capacity == 0 {
  Branch (190:16): [True: 0, False: 0]
  Branch (190:16): [Folded - Ignored]
191
0
                spec.broadcast_channel_capacity = DEFAULT_BROADCAST_CHANNEL_CAPACITY;
192
0
            }
193
0
            if spec.connection_timeout_ms == 0 {
  Branch (193:16): [True: 0, False: 0]
  Branch (193:16): [Folded - Ignored]
194
0
                spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS;
195
0
            }
196
0
            if spec.command_timeout_ms == 0 {
  Branch (196:16): [True: 0, False: 0]
  Branch (196:16): [Folded - Ignored]
197
0
                spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS;
198
0
            }
199
0
            if spec.connection_pool_size == 0 {
  Branch (199:16): [True: 0, False: 0]
  Branch (199:16): [Folded - Ignored]
200
0
                spec.connection_pool_size = DEFAULT_CONNECTION_POOL_SIZE;
201
0
            }
202
0
            if spec.read_chunk_size == 0 {
  Branch (202:16): [True: 0, False: 0]
  Branch (202:16): [Folded - Ignored]
203
0
                spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
204
0
            }
205
0
            if spec.max_chunk_uploads_per_update == 0 {
  Branch (205:16): [True: 0, False: 0]
  Branch (205:16): [Folded - Ignored]
206
0
                spec.max_chunk_uploads_per_update = DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE;
207
0
            }
208
        }
209
0
        let connection_timeout = Duration::from_millis(spec.connection_timeout_ms);
210
0
        let command_timeout = Duration::from_millis(spec.command_timeout_ms);
211
0
212
0
        let mut builder = Builder::from_config(redis_config);
213
0
        builder
214
0
            .set_performance_config(PerformanceConfig {
215
0
                default_command_timeout: command_timeout,
216
0
                broadcast_channel_capacity: spec.broadcast_channel_capacity,
217
0
                ..Default::default()
218
0
            })
219
0
            .set_connection_config(ConnectionConfig {
220
0
                connection_timeout,
221
0
                internal_command_timeout: command_timeout,
222
0
                unresponsive: UnresponsiveConfig {
223
0
                    max_timeout: Some(connection_timeout),
224
0
                    // This number needs to be less than the connection timeout.
225
0
                    // We use 4 as it is a good balance between not spamming the server
226
0
                    // and not waiting too long.
227
0
                    interval: connection_timeout / 4,
228
0
                },
229
0
                ..Default::default()
230
0
            })
231
0
            .set_policy(reconnect_policy);
232
233
0
        let client_pool = builder
234
0
            .build_pool(spec.connection_pool_size)
235
0
            .err_tip(|| "while creating redis connection pool")?;
236
237
0
        let subscriber_client = builder
238
0
            .build_subscriber_client()
239
0
            .err_tip(|| "while creating redis subscriber client")?;
240
241
0
        Self::new_from_builder_and_parts(
242
0
            client_pool,
243
0
            subscriber_client,
244
0
            spec.experimental_pub_sub_channel.clone(),
245
0
            || Uuid::new_v4().to_string(),
246
0
            spec.key_prefix.clone(),
247
0
            spec.read_chunk_size,
248
0
            spec.max_chunk_uploads_per_update,
249
0
        )
250
0
        .map(Arc::new)
251
0
    }
252
253
    /// Used for testing when determinism is required.
254
10
    pub fn new_from_builder_and_parts(
255
10
        client_pool: RedisPool,
256
10
        subscriber_client: SubscriberClient,
257
10
        pub_sub_channel: Option<String>,
258
10
        temp_name_generator_fn: fn() -> String,
259
10
        key_prefix: String,
260
10
        read_chunk_size: usize,
261
10
        max_chunk_uploads_per_update: usize,
262
10
    ) -> Result<Self, Error> {
263
10
        // Start connection pool (this will retry forever by default).
264
10
        client_pool.connect();
265
10
        subscriber_client.connect();
266
10
267
10
        Ok(Self {
268
10
            client_pool,
269
10
            pub_sub_channel,
270
10
            subscriber_client,
271
10
            fingerprint_create_index: fingerprint_create_index_template(),
272
10
            temp_name_generator_fn,
273
10
            key_prefix,
274
10
            read_chunk_size,
275
10
            max_chunk_uploads_per_update,
276
10
            update_if_version_matches_script: Script::from_lua(LUA_VERSION_SET_SCRIPT),
277
10
            subscription_manager: Mutex::new(None),
278
10
        })
279
10
    }
280
281
    /// Encode a [`StoreKey`] so it can be sent to Redis.
282
21
    fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> {
283
21
        let key_body = key.as_str();
284
21
        if self.key_prefix.is_empty() {
  Branch (284:12): [True: 17, False: 4]
  Branch (284:12): [Folded - Ignored]
285
17
            key_body
286
        } else {
287
            // This is in the hot path for all redis operations, so we try to reuse the allocation
288
            // from `key.as_str()` if possible.
289
4
            match key_body {
290
4
                Cow::Owned(mut encoded_key) => {
291
4
                    encoded_key.insert_str(0, &self.key_prefix);
292
4
                    Cow::Owned(encoded_key)
293
                }
294
0
                Cow::Borrowed(body) => {
295
0
                    let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len());
296
0
                    encoded_key.push_str(&self.key_prefix);
297
0
                    encoded_key.push_str(body);
298
0
                    Cow::Owned(encoded_key)
299
                }
300
            }
301
        }
302
21
    }
303
}
304
305
#[async_trait]
306
impl StoreDriver for RedisStore {
307
    async fn has_with_results(
308
        self: Pin<&Self>,
309
        keys: &[StoreKey<'_>],
310
        results: &mut [Option<u64>],
311
6
    ) -> Result<(), Error> {
312
        // TODO(allada) We could use pipeline here, but it makes retry more
313
        // difficult and it doesn't work very well in cluster mode.
314
        // If we wanted to optimize this with pipeline be careful to
315
        // implement retry and to support cluster mode.
316
6
        let client = self.client_pool.next();
317
6
        keys.iter()
318
6
            .zip(results.iter_mut())
319
6
            .map(|(key, result)| async move {
320
6
                // We need to do a special pass to ensure our zero key exist.
321
6
                if is_zero_digest(key.borrow()) {
  Branch (321:20): [True: 2, False: 4]
  Branch (321:20): [Folded - Ignored]
322
2
                    *result = Some(0);
323
2
                    return Ok::<_, Error>(());
324
4
                }
325
4
                let encoded_key = self.encode_key(key);
326
4
                let pipeline = client.pipeline();
327
4
                pipeline
328
4
                    .strlen::<(), _>(encoded_key.as_ref())
329
0
                    .await
330
4
                    .err_tip(|| {
331
0
                        format!("In RedisStore::has_with_results::strlen for {encoded_key}")
332
4
                    })
?0
;
333
                // Redis returns 0 when the key doesn't exist
334
                // AND when the key exists with value of length 0.
335
                // Therefore, we need to check both length and existence
336
                // and do it in a pipeline for efficiency.
337
4
                pipeline
338
4
                    .exists::<(), _>(encoded_key.as_ref())
339
0
                    .await
340
4
                    .err_tip(|| {
341
0
                        format!("In RedisStore::has_with_results::exists for {encoded_key}")
342
4
                    })
?0
;
343
4
                let (blob_len, exists) = pipeline
344
4
                    .all::<(u64, bool)>()
345
4
                    .await
346
4
                    .err_tip(|| 
"In RedisStore::has_with_results::query"0
)
?0
;
347
348
4
                *result = if exists { Some(blob_len) } else { 
None0
};
  Branch (348:30): [True: 4, False: 0]
  Branch (348:30): [Folded - Ignored]
349
350
4
                Ok::<_, Error>(())
351
12
            })
352
6
            .collect::<FuturesUnordered<_>>()
353
6
            .try_collect()
354
4
            .await
355
12
    }
356
357
    async fn update(
358
        self: Pin<&Self>,
359
        key: StoreKey<'_>,
360
        mut reader: DropCloserReadHalf,
361
        _upload_size: UploadSizeInfo,
362
7
    ) -> Result<(), Error> {
363
7
        let final_key = self.encode_key(&key);
364
7
365
7
        // While the name generation function can be supplied by the user, we need to have the curly
366
7
        // braces in place in order to manage redis' hashing behavior and make sure that the temporary
367
7
        // key name and the final key name are directed to the same cluster node. See
368
7
        // https://redis.io/blog/redis-clustering-best-practices-with-keys/
369
7
        //
370
7
        // The TL;DR is that if we're in cluster mode and the names hash differently, we can't use request
371
7
        // pipelining. By using these braces, we tell redis to only hash the part of the temporary key that's
372
7
        // identical to the final key -- so they will always hash to the same node.
373
7
        let temp_key = format!(
374
7
            "temp-{}-{{{}}}",
375
7
            (self.temp_name_generator_fn)(),
376
7
            &final_key
377
7
        );
378
7
379
7
        if is_zero_digest(key.borrow()) {
  Branch (379:12): [True: 2, False: 5]
  Branch (379:12): [Folded - Ignored]
380
2
            let chunk = reader
381
2
                .peek()
382
0
                .await
383
2
                .err_tip(|| 
"Failed to peek in RedisStore::update"0
)
?0
;
384
2
            if chunk.is_empty() {
  Branch (384:16): [True: 2, False: 0]
  Branch (384:16): [Folded - Ignored]
385
2
                reader
386
2
                    .drain()
387
0
                    .await
388
2
                    .err_tip(|| 
"Failed to drain in RedisStore::update"0
)
?0
;
389
                // Zero-digest keys are special -- we don't need to do anything with it.
390
2
                return Ok(());
391
0
            }
392
5
        }
393
394
5
        let client = self.client_pool.next();
395
5
396
5
        let mut read_stream = reader
397
6
            .scan(0u32, |bytes_read, chunk_res| {
398
6
                future::ready(Some(
399
6
                    chunk_res
400
6
                        .err_tip(|| 
"Failed to read chunk in update in redis store"1
)
401
6
                        .and_then(|chunk| {
402
5
                            let offset = *bytes_read;
403
5
                            let chunk_len = u32::try_from(chunk.len()).err_tip(|| {
404
0
                                "Could not convert chunk length to u32 in RedisStore::update"
405
5
                            })
?0
;
406
5
                            let new_bytes_read = bytes_read
407
5
                                .checked_add(chunk_len)
408
5
                                .err_tip(|| 
"Overflow protection in RedisStore::update"0
)
?0
;
409
5
                            *bytes_read = new_bytes_read;
410
5
                            Ok::<_, Error>((offset, *bytes_read, chunk))
411
6
                        
}5
),
412
6
                ))
413
6
            })
414
6
            .map(|res| {
415
6
                let (
offset, end_pos, chunk5
) = res
?1
;
416
5
                let temp_key_ref = &temp_key;
417
5
                Ok(async move {
418
5
                    client
419
5
                        .setrange::<(), _, _>(temp_key_ref, offset, chunk)
420
5
                        .await
421
5
                        .err_tip(|| {
422
0
                            "While appending to append to temp key in RedisStore::update"
423
5
                        })
?0
;
424
5
                    Ok::<u32, Error>(end_pos)
425
5
                })
426
6
            })
427
5
            .try_buffer_unordered(self.max_chunk_uploads_per_update);
428
5
429
5
        let mut total_len: u32 = 0;
430
11
        while let Some(
last_pos5
) =
read_stream.try_next()10
.await
?1
{
  Branch (430:19): [True: 5, False: 4]
  Branch (430:19): [Folded - Ignored]
431
5
            if last_pos > total_len {
  Branch (431:16): [True: 5, False: 0]
  Branch (431:16): [Folded - Ignored]
432
5
                total_len = last_pos;
433
5
            }
0
434
        }
435
436
4
        let blob_len = client
437
4
            .strlen::<u64, _>(&temp_key)
438
4
            .await
439
4
            .err_tip(|| 
format!("In RedisStore::update strlen check for {temp_key}")0
)
?0
;
440
        // This is a safety check to ensure that in the event some kind of retry was to happen
441
        // and the data was appended to the key twice, we reject the data.
442
4
        if blob_len != u64::from(total_len) {
  Branch (442:12): [True: 0, False: 4]
  Branch (442:12): [Folded - Ignored]
443
0
            return Err(make_input_err!(
444
0
                "Data length mismatch in RedisStore::update for {}({}) - expected {} bytes, got {} bytes",
445
0
                key.borrow().as_str(),
446
0
                temp_key,
447
0
                total_len,
448
0
                blob_len,
449
0
            ));
450
4
        }
451
4
452
4
        // Rename the temp key so that the data appears under the real key. Any data already present in the real key is lost.
453
4
        client
454
4
            .rename::<(), _, _>(&temp_key, final_key.as_ref())
455
4
            .await
456
4
            .err_tip(|| 
"While queueing key rename in RedisStore::update()"0
)
?0
;
457
458
        // If we have a publish channel configured, send a notice that the key has been set.
459
4
        if let Some(
pub_sub_channel0
) = &self.pub_sub_channel {
  Branch (459:16): [True: 0, False: 4]
  Branch (459:16): [Folded - Ignored]
460
0
            return Ok(client.publish(pub_sub_channel, final_key.as_ref()).await?);
461
4
        };
462
4
463
4
        Ok(())
464
14
    }
465
466
    async fn get_part(
467
        self: Pin<&Self>,
468
        key: StoreKey<'_>,
469
        writer: &mut DropCloserWriteHalf,
470
        offset: u64,
471
        length: Option<u64>,
472
5
    ) -> Result<(), Error> {
473
5
        let offset = usize::try_from(offset).err_tip(|| 
"Could not convert offset to usize"0
)
?0
;
474
5
        let length = length
475
5
            .map(|v| 
usize::try_from(v).err_tip(4
||
"Could not convert length to usize"0
)4
)
476
5
            .transpose()
?0
;
477
478
        // To follow RBE spec we need to consider any digest's with
479
        // zero size to be existing.
480
5
        if is_zero_digest(key.borrow()) {
  Branch (480:12): [True: 0, False: 5]
  Branch (480:12): [Folded - Ignored]
481
0
            return writer
482
0
                .send_eof()
483
0
                .err_tip(|| "Failed to send zero EOF in redis store get_part");
484
5
        }
485
5
486
5
        let client = self.client_pool.next();
487
5
        let encoded_key = self.encode_key(&key);
488
5
        let encoded_key = encoded_key.as_ref();
489
5
490
5
        // N.B. the `-1`'s you see here are because redis GETRANGE is inclusive at both the start and end, so when we
491
5
        // do math with indices we change them to be exclusive at the end.
492
5
493
5
        // We want to read the data at the key from `offset` to `offset + length`.
494
5
        let data_start = offset;
495
5
        let data_end = data_start
496
5
            .saturating_add(length.unwrap_or(isize::MAX as usize))
497
5
            .saturating_sub(1);
498
5
499
5
        // And we don't ever want to read more than `read_chunk_size` bytes at a time, so we'll need to iterate.
500
5
        let mut chunk_start = data_start;
501
5
        let mut chunk_end = cmp::min(
502
5
            data_start.saturating_add(self.read_chunk_size) - 1,
503
5
            data_end,
504
5
        );
505
506
        loop {
507
8
            let chunk: Bytes = client
508
8
                .getrange(encoded_key, chunk_start, chunk_end)
509
8
                .await
510
8
                .err_tip(|| 
"In RedisStore::get_part::getrange"0
)
?0
;
511
512
8
            let didnt_receive_full_chunk = chunk.len() < self.read_chunk_size;
513
8
            let reached_end_of_data = chunk_end == data_end;
514
8
515
8
            if didnt_receive_full_chunk || 
reached_end_of_data4
{
  Branch (515:16): [True: 4, False: 4]
  Branch (515:44): [True: 1, False: 3]
  Branch (515:16): [Folded - Ignored]
  Branch (515:44): [Folded - Ignored]
516
5
                if !chunk.is_empty() {
  Branch (516:20): [True: 4, False: 1]
  Branch (516:20): [Folded - Ignored]
517
4
                    writer
518
4
                        .send(chunk)
519
0
                        .await
520
4
                        .err_tip(|| 
"Failed to write data in RedisStore::get_part"0
)
?0
;
521
1
                }
522
523
5
                break; // No more data to read.
524
3
            }
525
3
526
3
            // We received a full chunk's worth of data, so write it...
527
3
            writer
528
3
                .send(chunk)
529
0
                .await
530
3
                .err_tip(|| 
"Failed to write data in RedisStore::get_part"0
)
?0
;
531
532
            // ...and go grab the next chunk.
533
3
            chunk_start = chunk_end + 1;
534
3
            chunk_end = cmp::min(
535
3
                chunk_start.saturating_add(self.read_chunk_size) - 1,
536
3
                data_end,
537
3
            );
538
        }
539
540
        // If we didn't write any data, check if the key exists, if not return a NotFound error.
541
        // This is required by spec.
542
5
        if writer.get_bytes_written() == 0 {
  Branch (542:12): [True: 1, False: 4]
  Branch (542:12): [Folded - Ignored]
543
            // We're supposed to read 0 bytes, so just check if the key exists.
544
1
            let exists = client
545
1
                .exists::<bool, _>(encoded_key)
546
1
                .await
547
1
                .err_tip(|| 
"In RedisStore::get_part::zero_exists"0
)
?0
;
548
549
1
            if !exists {
  Branch (549:16): [True: 1, False: 0]
  Branch (549:16): [Folded - Ignored]
550
1
                return Err(make_err!(
551
1
                    Code::NotFound,
552
1
                    "Data not found in Redis store for digest: {key:?}"
553
1
                ));
554
0
            }
555
4
        }
556
557
4
        writer
558
4
            .send_eof()
559
4
            .err_tip(|| 
"Failed to write EOF in redis store get_part"0
)
560
10
    }
561
562
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
563
0
        self
564
0
    }
565
566
0
    fn as_any(&self) -> &(dyn std::any::Any + Sync + Send) {
567
0
        self
568
0
    }
569
570
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send> {
571
0
        self
572
0
    }
573
574
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
575
0
        registry.register_indicator(self);
576
0
    }
577
}
578
579
#[async_trait]
580
impl HealthStatusIndicator for RedisStore {
581
0
    fn get_name(&self) -> &'static str {
582
0
        "RedisStore"
583
0
    }
584
585
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
586
0
        StoreDriver::check_health(Pin::new(self), namespace).await
587
0
    }
588
}
589
590
/// -------------------------------------------------------------------
591
/// Below this line are specific to the redis scheduler implementation.
592
/// -------------------------------------------------------------------
593
594
/// The maximum number of results to return per cursor.
595
const MAX_COUNT_PER_CURSOR: u64 = 256;
596
/// The time in milliseconds that a redis cursor can be idle before it is closed.
597
const CURSOR_IDLE_MS: u64 = 2_000;
598
/// The name of the field in the Redis hash that stores the data.
599
const DATA_FIELD_NAME: &str = "data";
600
/// The name of the field in the Redis hash that stores the version.
601
const VERSION_FIELD_NAME: &str = "version";
602
/// The time to live of indexes in seconds. After this time redis may delete the index.
603
const INDEX_TTL_S: u64 = 60 * 60 * 24; // 24 hours.
604
605
/// String of the `FT.CREATE` command used to create the index template. It is done this
606
/// way so we can use it in both const (compile time) functions and runtime functions.
607
/// This is a macro because we need to use it in multiple places that sometimes require the
608
/// data as different data types (specifically for rust's format_args! macro).
609
macro_rules! get_create_index_template {
610
    () => {
611
        "FT.CREATE {} ON HASH PREFIX 1 {} NOOFFSETS NOHL NOFIELDS NOFREQS SCHEMA {} TAG CASESENSITIVE SORTABLE"
612
    }
613
}
614
615
/// Lua script to set a key if the version matches.
616
/// Args:
617
///   KEYS[1]: The key where the version is stored.
618
///   ARGV[1]: The expected version.
619
///   ARGV[2]: The new data.
620
///   ARGV[3*]: Key-value pairs of additional data to include.
621
/// Returns:
622
///   The new version if the version matches. nil is returned if the
623
///   value was not set.
624
const LUA_VERSION_SET_SCRIPT: &str = formatcp!(
625
    r"
626
local key = KEYS[1]
627
local expected_version = tonumber(ARGV[1])
628
local new_data = ARGV[2]
629
local new_version = redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', 1)
630
local i
631
local indexes = {{}}
632
633
if new_version-1 ~= expected_version then
634
    redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', -1)
635
    return 0
636
end
637
-- Skip first 2 argvs, as they are known inputs.
638
-- Remember: Lua is 1-indexed.
639
for i=3, #ARGV do
640
    indexes[i-2] = ARGV[i]
641
end
642
643
-- In testing we witnessed redis sometimes not update our FT indexes
644
-- resulting in stale data. It appears if we delete our keys then insert
645
-- them again it works and reduces risk significantly.
646
redis.call('DEL', key)
647
redis.call('HSET', key, '{DATA_FIELD_NAME}', new_data, '{VERSION_FIELD_NAME}', new_version, unpack(indexes))
648
649
return new_version
650
"
651
);
652
653
/// Compile-time fingerprint of the `FT.CREATE` command used to create the index template.
654
/// This is a simple CRC32 checksum of the command string. We don't care about it actually
655
/// being a valid CRC32 checksum, just that it's a unique identifier with a low chance of
656
/// collision.
657
13
const fn fingerprint_create_index_template() -> u32 {
658
    const POLY: u32 = 0xEDB8_8320;
659
    const DATA: &[u8] = get_create_index_template!().as_bytes();
660
13
    let mut crc = 0xFFFF_FFFF;
661
13
    let mut i = 0;
662
1.32k
    while i < DATA.len() {
  Branch (662:11): [True: 1.31k, False: 13]
  Branch (662:11): [Folded - Ignored]
663
1.31k
        let byte = DATA[i];
664
1.31k
        crc ^= byte as u32;
665
1.31k
666
1.31k
        let mut j = 0;
667
11.8k
        while j < 8 {
  Branch (667:15): [True: 10.5k, False: 1.31k]
  Branch (667:15): [Folded - Ignored]
668
10.5k
            crc = if crc & 1 != 0 {
  Branch (668:22): [True: 5.01k, False: 5.48k]
  Branch (668:22): [Folded - Ignored]
669
5.01k
                (crc >> 1) ^ POLY
670
            } else {
671
5.48k
                crc >> 1
672
            };
673
10.5k
            j += 1;
674
        }
675
1.31k
        i += 1;
676
    }
677
13
    crc
678
13
}
679
680
/// Get the name of the index to create for the given field.
681
/// This will add some prefix data to the name to try and ensure
682
/// if the index definition changes, the name will get a new name.
683
macro_rules! get_index_name {
684
    ($prefix:expr, $field:expr, $maybe_sort:expr) => {
685
        format_args!(
686
            "{}_{}_{}_{:08x}",
687
            $prefix,
688
            $field,
689
            $maybe_sort.unwrap_or(""),
690
            fingerprint_create_index_template(),
691
        )
692
    };
693
}
694
695
/// Try to sanitize a string to be used as a Redis key.
696
/// We don't actually modify the string, just check if it's valid.
697
2
const fn try_sanitize(s: &str) -> Option<&str> {
698
2
    // Note: We cannot use for loops or iterators here because they are not const.
699
2
    // Allowing us to use a const function here gives the compiler the ability to
700
2
    // optimize this function away entirely in the case where the input is constant.
701
2
    let chars = s.as_bytes();
702
2
    let mut i: usize = 0;
703
2
    let len = s.len();
704
    loop {
705
180
        if i >= len {
  Branch (705:12): [True: 2, False: 178]
  Branch (705:12): [Folded - Ignored]
706
2
            break;
707
178
        }
708
178
        let c = chars[i];
709
178
        if !c.is_ascii_alphanumeric() && 
c != b'_'10
{
  Branch (709:12): [True: 10, False: 168]
  Branch (709:42): [True: 0, False: 10]
  Branch (709:12): [Folded - Ignored]
  Branch (709:42): [Folded - Ignored]
710
0
            return None;
711
178
        }
712
178
        i += 1;
713
    }
714
2
    Some(s)
715
2
}
716
717
/// An individual subscription to a key in Redis.
718
pub struct RedisSubscription {
719
    receiver: Option<tokio::sync::watch::Receiver<String>>,
720
    weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
721
}
722
723
impl SchedulerSubscription for RedisSubscription {
724
    /// Wait for the subscription key to change.
725
7
    async fn changed(&mut self) -> Result<(), Error> {
726
7
        let receiver = self
727
7
            .receiver
728
7
            .as_mut()
729
7
            .ok_or_else(|| 
make_err!(Code::Internal, "In RedisSubscription::changed::as_mut")0
)
?0
;
730
7
        receiver
731
7
            .changed()
732
5
            .await
733
6
            .map_err(|_| 
make_err!(Code::Internal, "In RedisSubscription::changed::changed")0
)
734
6
    }
735
}
736
737
// If the subscription is dropped, we need to possibly remove the key from the
738
// subscribed keys map.
739
impl Drop for RedisSubscription {
740
2
    fn drop(&mut self) {
741
2
        let Some(receiver) = self.receiver.take() else {
  Branch (741:13): [True: 2, False: 0]
  Branch (741:13): [Folded - Ignored]
742
0
            event!(
743
0
                Level::WARN,
744
0
                "RedisSubscription has already been dropped, nothing to do."
745
            );
746
0
            return; // Already dropped, nothing to do.
747
        };
748
2
        let key = receiver.borrow().clone();
749
2
        // IMPORTANT: This must be dropped before receiver_count() is called.
750
2
        drop(receiver);
751
2
        let Some(subscribed_keys) = self.weak_subscribed_keys.upgrade() else {
  Branch (751:13): [True: 2, False: 0]
  Branch (751:13): [Folded - Ignored]
752
0
            return; // Already dropped, nothing to do.
753
        };
754
2
        let mut subscribed_keys = subscribed_keys.write();
755
2
        let Some(value) = subscribed_keys.get(&key) else {
  Branch (755:13): [True: 2, False: 0]
  Branch (755:13): [Folded - Ignored]
756
0
            event!(
757
0
                Level::ERROR,
758
0
                "Key {key} was not found in subscribed keys when checking if it should be removed."
759
            );
760
0
            return;
761
        };
762
        // If we have no receivers, cleanup the entry from our map.
763
2
        if value.receiver_count() == 0 {
  Branch (763:12): [True: 2, False: 0]
  Branch (763:12): [Folded - Ignored]
764
2
            subscribed_keys.remove(key);
765
2
        }
0
766
2
    }
767
}
768
769
/// A publisher for a key in Redis.
770
struct RedisSubscriptionPublisher {
771
    sender: Mutex<tokio::sync::watch::Sender<String>>,
772
}
773
774
impl RedisSubscriptionPublisher {
775
2
    fn new(
776
2
        key: String,
777
2
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
778
2
    ) -> (Self, RedisSubscription) {
779
2
        let (sender, receiver) = tokio::sync::watch::channel(key);
780
2
        let publisher = Self {
781
2
            sender: Mutex::new(sender),
782
2
        };
783
2
        let subscription = RedisSubscription {
784
2
            receiver: Some(receiver),
785
2
            weak_subscribed_keys,
786
2
        };
787
2
        (publisher, subscription)
788
2
    }
789
790
0
    fn subscribe(
791
0
        &self,
792
0
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
793
0
    ) -> RedisSubscription {
794
0
        let receiver = self.sender.lock().subscribe();
795
0
        RedisSubscription {
796
0
            receiver: Some(receiver),
797
0
            weak_subscribed_keys,
798
0
        }
799
0
    }
800
801
2
    fn receiver_count(&self) -> usize {
802
2
        self.sender.lock().receiver_count()
803
2
    }
804
805
4
    fn notify(&self) {
806
4
        // TODO(https://github.com/sile/patricia_tree/issues/40) When this is addressed
807
4
        // we can remove the `Mutex` and use the mutable iterator directly.
808
4
        self.sender.lock().send_modify(|_| {});
809
4
    }
810
}
811
812
pub struct RedisSubscriptionManager {
813
    subscribed_keys: Arc<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
814
    tx_for_test: tokio::sync::mpsc::UnboundedSender<String>,
815
    _subscription_spawn: JoinHandleDropGuard<()>,
816
}
817
818
impl RedisSubscriptionManager {
819
1
    pub fn new(subscribe_client: SubscriberClient, pub_sub_channel: String) -> Self {
820
1
        let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new()));
821
1
        let subscribed_keys_weak = Arc::downgrade(&subscribed_keys);
822
1
        let (tx_for_test, mut rx_for_test) = tokio::sync::mpsc::unbounded_channel();
823
1
        Self {
824
1
            subscribed_keys,
825
1
            tx_for_test,
826
1
            _subscription_spawn: spawn!("redis_subscribe_spawn", async move {
827
1
                let mut rx = subscribe_client.message_rx();
828
                loop {
829
1
                    if let Err(
e0
) = subscribe_client.subscribe(&pub_sub_channel).await {
  Branch (829:28): [True: 0, False: 1]
  Branch (829:28): [Folded - Ignored]
830
0
                        event!(Level::ERROR, "Error subscribing to pattern - {e}");
831
0
                        return;
832
1
                    }
833
1
                    let mut reconnect_rx = subscribe_client.reconnect_rx();
834
1
                    let reconnect_fut = reconnect_rx.recv().fuse();
835
1
                    tokio::pin!(reconnect_fut);
836
                    loop {
837
4
                        let 
key3
= select! {
838
4
                            
value3
= rx_for_test.recv() => {
839
3
                                let Some(value) = value else {
  Branch (839:37): [True: 3, False: 0]
  Branch (839:37): [Folded - Ignored]
840
0
                                    unreachable!("Channel should never close");
841
                                };
842
3
                                value.into()
843
                            },
844
4
                            
msg0
= rx.recv() => {
845
0
                                match msg {
846
0
                                    Ok(msg) => {
847
0
                                        if let RedisValue::String(s) = msg.value {
  Branch (847:48): [True: 0, False: 0]
  Branch (847:48): [Folded - Ignored]
848
0
                                            s
849
                                        } else {
850
0
                                            event!(Level::ERROR, "Received non-string message in RedisSubscriptionManager");
851
0
                                            continue;
852
                                        }
853
                                    },
854
0
                                    Err(e) => {
855
0
                                        // Check to see if our parent has been dropped and if so kill spawn.
856
0
                                        if subscribed_keys_weak.upgrade().is_none() {
  Branch (856:44): [True: 0, False: 0]
  Branch (856:44): [Folded - Ignored]
857
0
                                            event!(Level::WARN, "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn");
858
0
                                            return;
859
0
                                        };
860
0
                                        event!(Level::ERROR, "Error receiving message in RedisSubscriptionManager reconnecting and flagging everything changed - {e}");
861
0
                                        break;
862
                                    }
863
                                }
864
                            },
865
4
                            _ = &mut reconnect_fut => {
866
0
                                event!(Level::WARN, "Redis reconnected flagging all subscriptions as changed and resuming");
867
0
                                break;
868
                            }
869
                        };
870
3
                        let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
  Branch (870:29): [True: 3, False: 0]
  Branch (870:29): [Folded - Ignored]
871
0
                            event!(Level::WARN, "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn");
872
0
                            return;
873
                        };
874
3
                        let subscribed_keys_mux = subscribed_keys.read();
875
3
                        subscribed_keys_mux
876
3
                            .common_prefix_values(&*key)
877
3
                            .for_each(RedisSubscriptionPublisher::notify);
878
                    }
879
                    // Sleep for a small amount of time to ensure we don't reconnect too quickly.
880
0
                    sleep(Duration::from_secs(1)).await;
881
                    // If we reconnect or lag behind we might have had dirty keys, so we need to
882
                    // flag all of them as changed.
883
0
                    let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
  Branch (883:25): [True: 0, False: 0]
  Branch (883:25): [Folded - Ignored]
884
0
                        event!(Level::WARN, "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn");
885
0
                        return;
886
                    };
887
0
                    let subscribed_keys_mux = subscribed_keys.read();
888
0
                    // Just in case also get a new receiver.
889
0
                    rx = subscribe_client.message_rx();
890
0
                    // Drop all buffered messages, then flag everything as changed.
891
0
                    rx.resubscribe();
892
0
                    for publisher in subscribed_keys_mux.values() {
893
0
                        publisher.notify();
894
0
                    }
895
                }
896
1
            
}0
),
897
        }
898
1
    }
899
}
900
901
impl SchedulerSubscriptionManager for RedisSubscriptionManager {
902
    type Subscription = RedisSubscription;
903
904
3
    fn notify_for_test(&self, value: String) {
905
3
        self.tx_for_test.send(value).unwrap();
906
3
    }
907
908
2
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
909
2
    where
910
2
        K: SchedulerStoreKeyProvider,
911
2
    {
912
2
        let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys);
913
2
        let mut subscribed_keys = self.subscribed_keys.write();
914
2
        let key = key.get_key();
915
2
        let key_str = key.as_str();
916
2
        let mut subscription = if let Some(
publisher0
) = subscribed_keys.get(&key_str) {
  Branch (916:39): [True: 0, False: 0]
  Branch (916:39): [Folded - Ignored]
  Branch (916:39): [Folded - Ignored]
  Branch (916:39): [True: 0, False: 2]
917
0
            publisher.subscribe(weak_subscribed_keys)
918
        } else {
919
2
            let (publisher, subscription) =
920
2
                RedisSubscriptionPublisher::new(key_str.to_string(), weak_subscribed_keys);
921
2
            subscribed_keys.insert(key_str, publisher);
922
2
            subscription
923
        };
924
2
        subscription
925
2
            .receiver
926
2
            .as_mut()
927
2
            .ok_or_else(|| {
928
0
                make_err!(
929
0
                    Code::Internal,
930
0
                    "Receiver should be set in RedisSubscriptionManager::subscribe"
931
0
                )
932
2
            })
?0
933
2
            .mark_changed();
934
2
935
2
        Ok(subscription)
936
2
    }
937
}
938
939
impl SchedulerStore for RedisStore {
940
    type SubscriptionManager = RedisSubscriptionManager;
941
942
3
    fn subscription_manager(&self) -> Result<Arc<RedisSubscriptionManager>, Error> {
943
3
        let mut subscription_manager = self.subscription_manager.lock();
944
3
        match &*subscription_manager {
945
2
            Some(subscription_manager) => Ok(subscription_manager.clone()),
946
            None => {
947
1
                let Some(pub_sub_channel) = &self.pub_sub_channel else {
  Branch (947:21): [True: 1, False: 0]
  Branch (947:21): [Folded - Ignored]
948
0
                    return Err(make_input_err!("RedisStore must have a pubsub channel for a Redis Scheduler if using subscriptions"));
949
                };
950
1
                let sub = Arc::new(RedisSubscriptionManager::new(
951
1
                    self.subscriber_client.clone(),
952
1
                    pub_sub_channel.clone(),
953
1
                ));
954
1
                *subscription_manager = Some(sub.clone());
955
1
                Ok(sub)
956
            }
957
        }
958
3
    }
959
960
3
    async fn update_data<T>(&self, data: T) -> Result<Option<u64>, Error>
961
3
    where
962
3
        T: SchedulerStoreDataProvider
963
3
            + SchedulerStoreKeyProvider
964
3
            + SchedulerCurrentVersionProvider
965
3
            + Send,
966
3
    {
967
3
        let key = data.get_key();
968
3
        let key = self.encode_key(&key);
969
3
        let client = self.client_pool.next();
970
3
        let maybe_index = data.get_indexes().err_tip(|| {
971
0
            format!("Err getting index in RedisStore::update_data::versioned for {key:?}")
972
3
        })
?0
;
973
3
        if <T as SchedulerStoreKeyProvider>::Versioned::VALUE {
  Branch (973:12): [True: 0, False: 0]
  Branch (973:12): [True: 0, False: 0]
  Branch (973:12): [Folded - Ignored]
  Branch (973:12): [Folded - Ignored]
  Branch (973:12): [True: 0, False: 1]
  Branch (973:12): [True: 2, False: 0]
974
2
            let current_version = data.current_version();
975
2
            let data = data.try_into_bytes().err_tip(|| {
976
0
                format!("Could not convert value to bytes in RedisStore::update_data::versioned for {key:?}")
977
2
            })
?0
;
978
2
            let mut argv = Vec::with_capacity(3 + maybe_index.len() * 2);
979
2
            argv.push(Bytes::from(format!("{current_version}")));
980
2
            argv.push(data);
981
8
            for (
name, value6
) in maybe_index {
982
6
                argv.push(Bytes::from_static(name.as_bytes()));
983
6
                argv.push(value);
984
6
            }
985
2
            let new_version = self
986
2
                .update_if_version_matches_script
987
2
                .evalsha_with_reload::<u64, _, Vec<Bytes>>(client, vec![key.as_ref()], argv)
988
2
                .await
989
2
                .err_tip(|| 
format!("In RedisStore::update_data::versioned for {key:?}")0
)
?0
;
990
2
            if new_version == 0 {
  Branch (990:16): [True: 0, False: 0]
  Branch (990:16): [True: 0, False: 0]
  Branch (990:16): [Folded - Ignored]
  Branch (990:16): [Folded - Ignored]
  Branch (990:16): [True: 0, False: 0]
  Branch (990:16): [True: 0, False: 2]
991
0
                return Ok(None);
992
2
            }
993
            // If we have a publish channel configured, send a notice that the key has been set.
994
2
            if let Some(pub_sub_channel) = &self.pub_sub_channel {
  Branch (994:20): [True: 0, False: 0]
  Branch (994:20): [True: 0, False: 0]
  Branch (994:20): [Folded - Ignored]
  Branch (994:20): [Folded - Ignored]
  Branch (994:20): [True: 0, False: 0]
  Branch (994:20): [True: 2, False: 0]
995
2
                return Ok(client.publish(pub_sub_channel, key.as_ref()).await
?0
);
996
0
            };
997
0
            Ok(Some(new_version))
998
        } else {
999
1
            let data = data.try_into_bytes().err_tip(|| {
1000
0
                format!("Could not convert value to bytes in RedisStore::update_data::noversion for {key:?}")
1001
1
            })
?0
;
1002
1
            let mut fields = RedisMap::new();
1003
1
            fields.reserve(1 + maybe_index.len());
1004
1
            fields.insert(DATA_FIELD_NAME.into(), data.into());
1005
1
            for (
name, value0
) in maybe_index {
1006
0
                fields.insert(name.into(), value.into());
1007
0
            }
1008
1
            client
1009
1
                .hset::<(), _, _>(key.as_ref(), fields)
1010
1
                .await
1011
1
                .err_tip(|| 
format!("In RedisStore::update_data::noversion for {key:?}")0
)
?0
;
1012
            // If we have a publish channel configured, send a notice that the key has been set.
1013
1
            if let Some(pub_sub_channel) = &self.pub_sub_channel {
  Branch (1013:20): [True: 0, False: 0]
  Branch (1013:20): [True: 0, False: 0]
  Branch (1013:20): [Folded - Ignored]
  Branch (1013:20): [Folded - Ignored]
  Branch (1013:20): [True: 1, False: 0]
  Branch (1013:20): [True: 0, False: 0]
1014
1
                return Ok(client.publish(pub_sub_channel, key.as_ref()).await
?0
);
1015
0
            };
1016
0
            Ok(Some(0)) // Always use "0" version since this is not a versioned request.
1017
        }
1018
3
    }
1019
1020
1
    async fn search_by_index_prefix<K>(
1021
1
        &self,
1022
1
        index: K,
1023
1
    ) -> Result<
1024
1
        impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
1025
1
        Error,
1026
1
    >
1027
1
    where
1028
1
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,
1029
1
    {
1030
1
        let index_value = index.index_value();
1031
2
        let run_ft_aggregate = || {
1032
2
            let client = self.client_pool.next().clone();
1033
2
            let sanitized_field = try_sanitize(index_value.as_ref()).err_tip(|| {
1034
0
                format!("In RedisStore::search_by_index_prefix::try_sanitize - {index_value:?}")
1035
2
            })
?0
;
1036
2
            Ok::<_, Error>(async move {
1037
2
                ft_aggregate(
1038
2
                    client,
1039
2
                    format!(
1040
2
                        "{}",
1041
2
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1042
2
                    ),
1043
2
                    format!("@{}:{{ {} }}", K::INDEX_NAME, sanitized_field),
1044
2
                    fred::types::FtAggregateOptions {
1045
2
                        load: Some(fred::types::Load::Some(vec![
1046
2
                            fred::types::SearchField {
1047
2
                                identifier: DATA_FIELD_NAME.into(),
1048
2
                                property: None,
1049
2
                            },
1050
2
                            fred::types::SearchField {
1051
2
                                identifier: VERSION_FIELD_NAME.into(),
1052
2
                                property: None,
1053
2
                            },
1054
2
                        ])),
1055
2
                        cursor: Some(fred::types::WithCursor {
1056
2
                            count: Some(MAX_COUNT_PER_CURSOR),
1057
2
                            max_idle: Some(CURSOR_IDLE_MS),
1058
2
                        }),
1059
2
                        pipeline: vec![fred::types::AggregateOperation::SortBy {
1060
2
                            properties: K::MAYBE_SORT_KEY.map_or_else(Vec::new, |v| {
1061
0
                                vec![(format!("@{v}").into(), fred::types::SortOrder::Asc)]
1062
2
                            }),
1063
2
                            max: None,
1064
2
                        }],
1065
2
                        ..Default::default()
1066
2
                    },
1067
2
                )
1068
2
                .await
1069
2
            })
1070
2
        };
1071
1
        let stream = run_ft_aggregate()
?0
1072
1
            .or_else(|_| async move {
1073
1
                let mut schema = vec![SearchSchema {
1074
1
                    field_name: K::INDEX_NAME.into(),
1075
1
                    alias: None,
1076
1
                    kind: SearchSchemaKind::Tag {
1077
1
                        sortable: false,
1078
1
                        unf: false,
1079
1
                        separator: None,
1080
1
                        casesensitive: false,
1081
1
                        withsuffixtrie: false,
1082
1
                        noindex: false,
1083
1
                    },
1084
1
                }];
1085
1
                if let Some(
sort_key0
) = K::MAYBE_SORT_KEY {
  Branch (1085:24): [True: 0, False: 0]
  Branch (1085:24): [True: 0, False: 0]
  Branch (1085:24): [Folded - Ignored]
  Branch (1085:24): [Folded - Ignored]
  Branch (1085:24): [True: 0, False: 1]
1086
0
                    schema.push(SearchSchema {
1087
0
                        field_name: sort_key.into(),
1088
0
                        alias: None,
1089
0
                        kind: SearchSchemaKind::Tag {
1090
0
                            sortable: true,
1091
0
                            unf: false,
1092
0
                            separator: None,
1093
0
                            casesensitive: false,
1094
0
                            withsuffixtrie: false,
1095
0
                            noindex: false,
1096
0
                        },
1097
0
                    });
1098
1
                }
1099
1
                let create_result = self
1100
1
                    .client_pool
1101
1
                    .next()
1102
1
                    .ft_create::<(), _>(
1103
1
                        format!(
1104
1
                            "{}",
1105
1
                            get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1106
1
                        ),
1107
1
                        FtCreateOptions {
1108
1
                            on: Some(fred::types::IndexKind::Hash),
1109
1
                            prefixes: vec![K::KEY_PREFIX.into()],
1110
1
                            nohl: true,
1111
1
                            nofields: true,
1112
1
                            nofreqs: true,
1113
1
                            nooffsets: true,
1114
1
                            temporary: Some(INDEX_TTL_S),
1115
1
                            ..Default::default()
1116
1
                        },
1117
1
                        schema,
1118
1
                    )
1119
1
                    .await
1120
1
                    .err_tip(|| {
1121
0
                        format!(
1122
0
                            "Error with ft_create in RedisStore::search_by_index_prefix({})",
1123
0
                            get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1124
0
                        )
1125
1
                    });
1126
1
                let run_result = run_ft_aggregate()
?0
.await.err_tip(|| {
1127
0
                    format!(
1128
0
                        "Error with second ft_aggregate in RedisStore::search_by_index_prefix({})",
1129
0
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1130
0
                    )
1131
1
                });
1132
1
                // Creating the index will race which is ok. If it fails to create, we only
1133
1
                // error if the second ft_aggregate call fails and fails to create.
1134
1
                run_result.or_else(move |e| 
create_result.merge(Err(e))0
)
1135
2
            }
)1
1136
3
            .await
?0
;
1137
1
        Ok(stream.map(|result| 
{0
1138
0
            let mut redis_map =
1139
0
                result.err_tip(|| "Error in stream of in RedisStore::search_by_index_prefix")?;
1140
0
            let bytes_data = redis_map
1141
0
                .remove(&RedisKey::from_static_str(DATA_FIELD_NAME))
1142
0
                .err_tip(|| "Missing data field in RedisStore::search_by_index_prefix")?
1143
0
                .into_bytes()
1144
0
                .err_tip(|| {
1145
0
                    formatcp!("'{DATA_FIELD_NAME}' is not Bytes in RedisStore::search_by_index_prefix::into_bytes")
1146
0
                })?;
1147
0
            let version = if <K as SchedulerIndexProvider>::Versioned::VALUE {
  Branch (1147:30): [True: 0, False: 0]
  Branch (1147:30): [True: 0, False: 0]
  Branch (1147:30): [Folded - Ignored]
  Branch (1147:30): [Folded - Ignored]
  Branch (1147:30): [True: 0, False: 0]
1148
0
                redis_map
1149
0
                    .remove(&RedisKey::from_static_str(VERSION_FIELD_NAME))
1150
0
                    .err_tip(|| "Missing version field in RedisStore::search_by_index_prefix")?
1151
0
                    .as_u64()
1152
0
                    .err_tip(|| {
1153
0
                        formatcp!("'{VERSION_FIELD_NAME}' is not u64 in RedisStore::search_by_index_prefix::as_u64")
1154
0
                    })?
1155
            } else {
1156
0
                0
1157
            };
1158
0
            K::decode(version, bytes_data)
1159
0
                .err_tip(|| "In RedisStore::search_by_index_prefix::decode")
1160
1
        
}0
))
1161
1
    }
1162
1163
2
    async fn get_and_decode<K>(
1164
2
        &self,
1165
2
        key: K,
1166
2
    ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>
1167
2
    where
1168
2
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send,
1169
2
    {
1170
2
        let key = key.get_key();
1171
2
        let key = self.encode_key(&key);
1172
2
        let client = self.client_pool.next();
1173
2
        let (maybe_version, maybe_data) = client
1174
2
            .hmget::<(Option<u64>, Option<Bytes>), _, _>(
1175
2
                key.as_ref(),
1176
2
                vec![
1177
2
                    RedisKey::from(VERSION_FIELD_NAME),
1178
2
                    RedisKey::from(DATA_FIELD_NAME),
1179
2
                ],
1180
2
            )
1181
2
            .await
1182
2
            .err_tip(|| 
format!("In RedisStore::get_without_version::notversioned {key}")0
)
?0
;
1183
2
        let Some(data) = maybe_data else {
  Branch (1183:13): [True: 0, False: 0]
  Branch (1183:13): [True: 0, False: 0]
  Branch (1183:13): [Folded - Ignored]
  Branch (1183:13): [Folded - Ignored]
  Branch (1183:13): [True: 2, False: 0]
1184
0
            return Ok(None);
1185
        };
1186
2
        Ok(Some(K::decode(maybe_version.unwrap_or(0), data).err_tip(
1187
2
            || 
format!("In RedisStore::get_with_version::notversioned::decode {key}")0
,
1188
2
        )
?0
))
1189
2
    }
1190
}