Coverage Report

Created: 2024-12-19 04:04

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/redis_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::cmp;
17
use std::pin::Pin;
18
use std::sync::{Arc, Weak};
19
use std::time::Duration;
20
21
use async_trait::async_trait;
22
use bytes::Bytes;
23
use const_format::formatcp;
24
use fred::clients::{Pool as RedisPool, SubscriberClient};
25
use fred::interfaces::{ClientLike, KeysInterface, PubsubInterface};
26
use fred::prelude::{EventInterface, HashesInterface, RediSearchInterface};
27
use fred::types::config::{
28
    Config as RedisConfig, ConnectionConfig, PerformanceConfig, ReconnectPolicy, UnresponsiveConfig,
29
};
30
use fred::types::redisearch::{
31
    AggregateOperation, FtAggregateOptions, FtCreateOptions, IndexKind, Load, SearchField,
32
    SearchSchema, SearchSchemaKind, WithCursor,
33
};
34
use fred::types::scripts::Script;
35
use fred::types::{Builder, Key as RedisKey, Map as RedisMap, SortOrder, Value as RedisValue};
36
use futures::stream::FuturesUnordered;
37
use futures::{future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
38
use nativelink_config::stores::{RedisMode, RedisSpec};
39
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
40
use nativelink_metric::MetricsComponent;
41
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
42
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
43
use nativelink_util::spawn;
44
use nativelink_util::store_trait::{
45
    BoolValue, SchedulerCurrentVersionProvider, SchedulerIndexProvider, SchedulerStore,
46
    SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,
47
    SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo,
48
};
49
use nativelink_util::task::JoinHandleDropGuard;
50
use parking_lot::{Mutex, RwLock};
51
use patricia_tree::StringPatriciaMap;
52
use tokio::select;
53
use tokio::time::sleep;
54
use tracing::{event, Level};
55
use uuid::Uuid;
56
57
use crate::cas_utils::is_zero_digest;
58
use crate::redis_utils::ft_aggregate;
59
60
/// The default size of the read chunk when reading data from Redis.
61
/// Note: If this changes it should be updated in the config documentation.
62
const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024;
63
64
/// The default size of the connection pool if not specified.
65
/// Note: If this changes it should be updated in the config documentation.
66
const DEFAULT_CONNECTION_POOL_SIZE: usize = 3;
67
68
/// The default delay between retries if not specified.
69
/// Note: If this changes it should be updated in the config documentation.
70
const DEFAULT_RETRY_DELAY: f32 = 0.1;
71
/// The amount of jitter to add to the retry delay if not specified.
72
/// Note: If this changes it should be updated in the config documentation.
73
const DEFAULT_RETRY_JITTER: f32 = 0.5;
74
75
/// The default maximum capacity of the broadcast channel if not specified.
76
/// Note: If this changes it should be updated in the config documentation.
77
const DEFAULT_BROADCAST_CHANNEL_CAPACITY: usize = 4096;
78
79
/// The default connection timeout in milliseconds if not specified.
80
/// Note: If this changes it should be updated in the config documentation.
81
const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000;
82
83
/// The default command timeout in milliseconds if not specified.
84
/// Note: If this changes it should be updated in the config documentation.
85
const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000;
86
87
/// The default maximum number of chunk uploads per update.
88
/// Note: If this changes it should be updated in the config documentation.
89
const DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE: usize = 10;
90
91
#[allow(clippy::trivially_copy_pass_by_ref)]
92
1
fn to_hex(value: &u32) -> String {
93
1
    format!("{value:08x}")
94
1
}
95
96
/// A [`StoreDriver`] implementation that uses Redis as a backing store.
97
#[derive(MetricsComponent)]
98
pub struct RedisStore {
99
    /// The client pool connecting to the backing Redis instance(s).
100
    client_pool: RedisPool,
101
102
    /// A channel to publish updates to when a key is added, removed, or modified.
103
    #[metric(
104
        help = "The pubsub channel to publish updates to when a key is added, removed, or modified"
105
    )]
106
    pub_sub_channel: Option<String>,
107
108
    /// A redis client for managing subscriptions.
109
    /// TODO: This should be moved into the store in followups once a standard use pattern has been determined.
110
    subscriber_client: SubscriberClient,
111
112
    /// For metrics only.
113
    #[metric(
114
        help = "A unique identifier for the FT.CREATE command used to create the index template",
115
        handler = to_hex
116
    )]
117
    fingerprint_create_index: u32,
118
119
    /// A function used to generate names for temporary keys.
120
    temp_name_generator_fn: fn() -> String,
121
122
    /// A common prefix to append to all keys before they are sent to Redis.
123
    ///
124
    /// See [`RedisStore::key_prefix`](`nativelink_config::stores::RedisStore::key_prefix`).
125
    #[metric(help = "Prefix to append to all keys before sending to Redis")]
126
    key_prefix: String,
127
128
    /// The amount of data to read from Redis at a time.
129
    #[metric(help = "The amount of data to read from Redis at a time")]
130
    read_chunk_size: usize,
131
132
    /// The maximum number of chunk uploads per update.
133
    /// This is used to limit the number of chunk uploads per update to prevent
134
    #[metric(help = "The maximum number of chunk uploads per update")]
135
    max_chunk_uploads_per_update: usize,
136
137
    /// Redis script used to update a value in redis if the version matches.
138
    /// This is done by incrementing the version number and then setting the new data
139
    /// only if the version number matches the existing version number.
140
    update_if_version_matches_script: Script,
141
142
    /// A manager for subscriptions to keys in Redis.
143
    subscription_manager: Mutex<Option<Arc<RedisSubscriptionManager>>>,
144
}
145
146
impl RedisStore {
147
    /// Create a new `RedisStore` from the given configuration.
148
0
    pub fn new(mut spec: RedisSpec) -> Result<Arc<Self>, Error> {
149
0
        if spec.addresses.is_empty() {
  Branch (149:12): [True: 0, False: 0]
  Branch (149:12): [Folded - Ignored]
150
0
            return Err(make_err!(
151
0
                Code::InvalidArgument,
152
0
                "No addresses were specified in redis store configuration."
153
0
            ));
154
0
        };
155
0
        let [addr] = spec.addresses.as_slice() else {
  Branch (155:13): [True: 0, False: 0]
  Branch (155:13): [Folded - Ignored]
156
0
            return Err(make_err!(Code::Unimplemented, "Connecting directly to multiple redis nodes in a cluster is currently unsupported. Please specify a single URL to a single node, and nativelink will use cluster discover to find the other nodes."));
157
        };
158
0
        let redis_config = match spec.mode {
159
0
            RedisMode::Cluster => RedisConfig::from_url_clustered(addr),
160
0
            RedisMode::Sentinel => RedisConfig::from_url_sentinel(addr),
161
0
            RedisMode::Standard => RedisConfig::from_url_centralized(addr),
162
        }
163
0
        .err_tip_with_code(|e| {
164
0
            (
165
0
                Code::InvalidArgument,
166
0
                format!("while parsing redis node address: {e}"),
167
0
            )
168
0
        })?;
169
170
0
        let reconnect_policy = {
171
0
            if spec.retry.delay == 0.0 {
  Branch (171:16): [True: 0, False: 0]
  Branch (171:16): [Folded - Ignored]
172
0
                spec.retry.delay = DEFAULT_RETRY_DELAY;
173
0
            }
174
0
            if spec.retry.jitter == 0.0 {
  Branch (174:16): [True: 0, False: 0]
  Branch (174:16): [Folded - Ignored]
175
0
                spec.retry.jitter = DEFAULT_RETRY_JITTER;
176
0
            }
177
178
0
            let max_retries = u32::try_from(spec.retry.max_retries)
179
0
                .err_tip(|| "max_retries could not be converted to u32 in RedisStore::new")?;
180
0
            let min_delay_ms = (spec.retry.delay * 1000.0) as u32;
181
0
            let max_delay_ms = 8000;
182
0
            let jitter = (spec.retry.jitter * spec.retry.delay * 1000.0) as u32;
183
0
184
0
            let mut reconnect_policy = ReconnectPolicy::new_exponential(
185
0
                max_retries,  /* max_retries, 0 is unlimited */
186
0
                min_delay_ms, /* min_delay */
187
0
                max_delay_ms, /* max_delay */
188
0
                2,            /* mult */
189
0
            );
190
0
            reconnect_policy.set_jitter(jitter);
191
0
            reconnect_policy
192
0
        };
193
0
194
0
        {
195
0
            if spec.broadcast_channel_capacity == 0 {
  Branch (195:16): [True: 0, False: 0]
  Branch (195:16): [Folded - Ignored]
196
0
                spec.broadcast_channel_capacity = DEFAULT_BROADCAST_CHANNEL_CAPACITY;
197
0
            }
198
0
            if spec.connection_timeout_ms == 0 {
  Branch (198:16): [True: 0, False: 0]
  Branch (198:16): [Folded - Ignored]
199
0
                spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS;
200
0
            }
201
0
            if spec.command_timeout_ms == 0 {
  Branch (201:16): [True: 0, False: 0]
  Branch (201:16): [Folded - Ignored]
202
0
                spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS;
203
0
            }
204
0
            if spec.connection_pool_size == 0 {
  Branch (204:16): [True: 0, False: 0]
  Branch (204:16): [Folded - Ignored]
205
0
                spec.connection_pool_size = DEFAULT_CONNECTION_POOL_SIZE;
206
0
            }
207
0
            if spec.read_chunk_size == 0 {
  Branch (207:16): [True: 0, False: 0]
  Branch (207:16): [Folded - Ignored]
208
0
                spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
209
0
            }
210
0
            if spec.max_chunk_uploads_per_update == 0 {
  Branch (210:16): [True: 0, False: 0]
  Branch (210:16): [Folded - Ignored]
211
0
                spec.max_chunk_uploads_per_update = DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE;
212
0
            }
213
        }
214
0
        let connection_timeout = Duration::from_millis(spec.connection_timeout_ms);
215
0
        let command_timeout = Duration::from_millis(spec.command_timeout_ms);
216
0
217
0
        let mut builder = Builder::from_config(redis_config);
218
0
        builder
219
0
            .set_performance_config(PerformanceConfig {
220
0
                default_command_timeout: command_timeout,
221
0
                broadcast_channel_capacity: spec.broadcast_channel_capacity,
222
0
                ..Default::default()
223
0
            })
224
0
            .set_connection_config(ConnectionConfig {
225
0
                connection_timeout,
226
0
                internal_command_timeout: command_timeout,
227
0
                unresponsive: UnresponsiveConfig {
228
0
                    max_timeout: Some(connection_timeout),
229
0
                    // This number needs to be less than the connection timeout.
230
0
                    // We use 4 as it is a good balance between not spamming the server
231
0
                    // and not waiting too long.
232
0
                    interval: connection_timeout / 4,
233
0
                },
234
0
                ..Default::default()
235
0
            })
236
0
            .set_policy(reconnect_policy);
237
238
0
        let client_pool = builder
239
0
            .build_pool(spec.connection_pool_size)
240
0
            .err_tip(|| "while creating redis connection pool")?;
241
242
0
        let subscriber_client = builder
243
0
            .build_subscriber_client()
244
0
            .err_tip(|| "while creating redis subscriber client")?;
245
246
0
        Self::new_from_builder_and_parts(
247
0
            client_pool,
248
0
            subscriber_client,
249
0
            spec.experimental_pub_sub_channel.clone(),
250
0
            || Uuid::new_v4().to_string(),
251
0
            spec.key_prefix.clone(),
252
0
            spec.read_chunk_size,
253
0
            spec.max_chunk_uploads_per_update,
254
0
        )
255
0
        .map(Arc::new)
256
0
    }
257
258
    /// Used for testing when determinism is required.
259
10
    pub fn new_from_builder_and_parts(
260
10
        client_pool: RedisPool,
261
10
        subscriber_client: SubscriberClient,
262
10
        pub_sub_channel: Option<String>,
263
10
        temp_name_generator_fn: fn() -> String,
264
10
        key_prefix: String,
265
10
        read_chunk_size: usize,
266
10
        max_chunk_uploads_per_update: usize,
267
10
    ) -> Result<Self, Error> {
268
10
        // Start connection pool (this will retry forever by default).
269
10
        client_pool.connect();
270
10
        subscriber_client.connect();
271
10
272
10
        Ok(Self {
273
10
            client_pool,
274
10
            pub_sub_channel,
275
10
            subscriber_client,
276
10
            fingerprint_create_index: fingerprint_create_index_template(),
277
10
            temp_name_generator_fn,
278
10
            key_prefix,
279
10
            read_chunk_size,
280
10
            max_chunk_uploads_per_update,
281
10
            update_if_version_matches_script: Script::from_lua(LUA_VERSION_SET_SCRIPT),
282
10
            subscription_manager: Mutex::new(None),
283
10
        })
284
10
    }
285
286
    /// Encode a [`StoreKey`] so it can be sent to Redis.
287
21
    fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> {
288
21
        let key_body = key.as_str();
289
21
        if self.key_prefix.is_empty() {
  Branch (289:12): [True: 17, False: 4]
  Branch (289:12): [Folded - Ignored]
290
17
            key_body
291
        } else {
292
            // This is in the hot path for all redis operations, so we try to reuse the allocation
293
            // from `key.as_str()` if possible.
294
4
            match key_body {
295
4
                Cow::Owned(mut encoded_key) => {
296
4
                    encoded_key.insert_str(0, &self.key_prefix);
297
4
                    Cow::Owned(encoded_key)
298
                }
299
0
                Cow::Borrowed(body) => {
300
0
                    let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len());
301
0
                    encoded_key.push_str(&self.key_prefix);
302
0
                    encoded_key.push_str(body);
303
0
                    Cow::Owned(encoded_key)
304
                }
305
            }
306
        }
307
21
    }
308
}
309
310
#[async_trait]
311
impl StoreDriver for RedisStore {
312
    async fn has_with_results(
313
        self: Pin<&Self>,
314
        keys: &[StoreKey<'_>],
315
        results: &mut [Option<u64>],
316
6
    ) -> Result<(), Error> {
317
        // TODO(allada) We could use pipeline here, but it makes retry more
318
        // difficult and it doesn't work very well in cluster mode.
319
        // If we wanted to optimize this with pipeline be careful to
320
        // implement retry and to support cluster mode.
321
6
        let client = self.client_pool.next();
322
6
        keys.iter()
323
6
            .zip(results.iter_mut())
324
6
            .map(|(key, result)| async move {
325
6
                // We need to do a special pass to ensure our zero key exist.
326
6
                if is_zero_digest(key.borrow()) {
  Branch (326:20): [True: 2, False: 4]
  Branch (326:20): [Folded - Ignored]
327
2
                    *result = Some(0);
328
2
                    return Ok::<_, Error>(());
329
4
                }
330
4
                let encoded_key = self.encode_key(key);
331
4
                let pipeline = client.pipeline();
332
4
                pipeline
333
4
                    .strlen::<(), _>(encoded_key.as_ref())
334
4
                    .await
335
4
                    .err_tip(|| {
336
0
                        format!("In RedisStore::has_with_results::strlen for {encoded_key}")
337
4
                    })
?0
;
338
                // Redis returns 0 when the key doesn't exist
339
                // AND when the key exists with value of length 0.
340
                // Therefore, we need to check both length and existence
341
                // and do it in a pipeline for efficiency.
342
4
                pipeline
343
4
                    .exists::<(), _>(encoded_key.as_ref())
344
4
                    .await
345
4
                    .err_tip(|| {
346
0
                        format!("In RedisStore::has_with_results::exists for {encoded_key}")
347
4
                    })
?0
;
348
4
                let (blob_len, exists) = pipeline
349
4
                    .all::<(u64, bool)>()
350
4
                    .await
351
4
                    .err_tip(|| 
"In RedisStore::has_with_results::query"0
)
?0
;
352
353
4
                *result = if exists { Some(blob_len) } else { 
None0
};
  Branch (353:30): [True: 4, False: 0]
  Branch (353:30): [Folded - Ignored]
354
355
4
                Ok::<_, Error>(())
356
12
            })
357
6
            .collect::<FuturesUnordered<_>>()
358
6
            .try_collect()
359
6
            .await
360
12
    }
361
362
    async fn update(
363
        self: Pin<&Self>,
364
        key: StoreKey<'_>,
365
        mut reader: DropCloserReadHalf,
366
        _upload_size: UploadSizeInfo,
367
7
    ) -> Result<(), Error> {
368
7
        let final_key = self.encode_key(&key);
369
7
370
7
        // While the name generation function can be supplied by the user, we need to have the curly
371
7
        // braces in place in order to manage redis' hashing behavior and make sure that the temporary
372
7
        // key name and the final key name are directed to the same cluster node. See
373
7
        // https://redis.io/blog/redis-clustering-best-practices-with-keys/
374
7
        //
375
7
        // The TL;DR is that if we're in cluster mode and the names hash differently, we can't use request
376
7
        // pipelining. By using these braces, we tell redis to only hash the part of the temporary key that's
377
7
        // identical to the final key -- so they will always hash to the same node.
378
7
        let temp_key = format!(
379
7
            "temp-{}-{{{}}}",
380
7
            (self.temp_name_generator_fn)(),
381
7
            &final_key
382
7
        );
383
7
384
7
        if is_zero_digest(key.borrow()) {
  Branch (384:12): [True: 2, False: 5]
  Branch (384:12): [Folded - Ignored]
385
2
            let chunk = reader
386
2
                .peek()
387
2
                .await
388
2
                .err_tip(|| 
"Failed to peek in RedisStore::update"0
)
?0
;
389
2
            if chunk.is_empty() {
  Branch (389:16): [True: 2, False: 0]
  Branch (389:16): [Folded - Ignored]
390
2
                reader
391
2
                    .drain()
392
2
                    .await
393
2
                    .err_tip(|| 
"Failed to drain in RedisStore::update"0
)
?0
;
394
                // Zero-digest keys are special -- we don't need to do anything with it.
395
2
                return Ok(());
396
0
            }
397
5
        }
398
399
5
        let client = self.client_pool.next();
400
5
401
5
        let mut read_stream = reader
402
6
            .scan(0u32, |bytes_read, chunk_res| {
403
6
                future::ready(Some(
404
6
                    chunk_res
405
6
                        .err_tip(|| 
"Failed to read chunk in update in redis store"1
)
406
6
                        .and_then(|chunk| {
407
5
                            let offset = *bytes_read;
408
5
                            let chunk_len = u32::try_from(chunk.len()).err_tip(|| {
409
0
                                "Could not convert chunk length to u32 in RedisStore::update"
410
5
                            })
?0
;
411
5
                            let new_bytes_read = bytes_read
412
5
                                .checked_add(chunk_len)
413
5
                                .err_tip(|| 
"Overflow protection in RedisStore::update"0
)
?0
;
414
5
                            *bytes_read = new_bytes_read;
415
5
                            Ok::<_, Error>((offset, *bytes_read, chunk))
416
6
                        
}5
),
417
6
                ))
418
6
            })
419
6
            .map(|res| {
420
6
                let (
offset, end_pos, chunk5
) = res
?1
;
421
5
                let temp_key_ref = &temp_key;
422
5
                Ok(async move {
423
5
                    client
424
5
                        .setrange::<(), _, _>(temp_key_ref, offset, chunk)
425
5
                        .await
426
5
                        .err_tip(|| {
427
0
                            "While appending to append to temp key in RedisStore::update"
428
5
                        })
?0
;
429
5
                    Ok::<u32, Error>(end_pos)
430
5
                })
431
6
            })
432
5
            .try_buffer_unordered(self.max_chunk_uploads_per_update);
433
5
434
5
        let mut total_len: u32 = 0;
435
10
        while let Some(
last_pos5
) = read_stream.try_next().await
?1
{
  Branch (435:19): [True: 5, False: 4]
  Branch (435:19): [Folded - Ignored]
436
5
            if last_pos > total_len {
  Branch (436:16): [True: 5, False: 0]
  Branch (436:16): [Folded - Ignored]
437
5
                total_len = last_pos;
438
5
            
}0
439
        }
440
441
4
        let blob_len = client
442
4
            .strlen::<u64, _>(&temp_key)
443
4
            .await
444
4
            .err_tip(|| 
format!("In RedisStore::update strlen check for {temp_key}")0
)
?0
;
445
        // This is a safety check to ensure that in the event some kind of retry was to happen
446
        // and the data was appended to the key twice, we reject the data.
447
4
        if blob_len != u64::from(total_len) {
  Branch (447:12): [True: 0, False: 4]
  Branch (447:12): [Folded - Ignored]
448
0
            return Err(make_input_err!(
449
0
                "Data length mismatch in RedisStore::update for {}({}) - expected {} bytes, got {} bytes",
450
0
                key.borrow().as_str(),
451
0
                temp_key,
452
0
                total_len,
453
0
                blob_len,
454
0
            ));
455
4
        }
456
4
457
4
        // Rename the temp key so that the data appears under the real key. Any data already present in the real key is lost.
458
4
        client
459
4
            .rename::<(), _, _>(&temp_key, final_key.as_ref())
460
4
            .await
461
4
            .err_tip(|| 
"While queueing key rename in RedisStore::update()"0
)
?0
;
462
463
        // If we have a publish channel configured, send a notice that the key has been set.
464
4
        if let Some(
pub_sub_channel0
) = &self.pub_sub_channel {
  Branch (464:16): [True: 0, False: 4]
  Branch (464:16): [Folded - Ignored]
465
0
            return Ok(client.publish(pub_sub_channel, final_key.as_ref()).await?);
466
4
        };
467
4
468
4
        Ok(())
469
14
    }
470
471
    async fn get_part(
472
        self: Pin<&Self>,
473
        key: StoreKey<'_>,
474
        writer: &mut DropCloserWriteHalf,
475
        offset: u64,
476
        length: Option<u64>,
477
5
    ) -> Result<(), Error> {
478
5
        let offset = usize::try_from(offset).err_tip(|| 
"Could not convert offset to usize"0
)
?0
;
479
5
        let length = length
480
5
            .map(|v| 
usize::try_from(v).err_tip(4
||
"Could not convert length to usize"0
)4
)
481
5
            .transpose()
?0
;
482
483
        // To follow RBE spec we need to consider any digest's with
484
        // zero size to be existing.
485
5
        if is_zero_digest(key.borrow()) {
  Branch (485:12): [True: 0, False: 5]
  Branch (485:12): [Folded - Ignored]
486
0
            return writer
487
0
                .send_eof()
488
0
                .err_tip(|| "Failed to send zero EOF in redis store get_part");
489
5
        }
490
5
491
5
        let client = self.client_pool.next();
492
5
        let encoded_key = self.encode_key(&key);
493
5
        let encoded_key = encoded_key.as_ref();
494
5
495
5
        // N.B. the `-1`'s you see here are because redis GETRANGE is inclusive at both the start and end, so when we
496
5
        // do math with indices we change them to be exclusive at the end.
497
5
498
5
        // We want to read the data at the key from `offset` to `offset + length`.
499
5
        let data_start = offset;
500
5
        let data_end = data_start
501
5
            .saturating_add(length.unwrap_or(isize::MAX as usize))
502
5
            .saturating_sub(1);
503
5
504
5
        // And we don't ever want to read more than `read_chunk_size` bytes at a time, so we'll need to iterate.
505
5
        let mut chunk_start = data_start;
506
5
        let mut chunk_end = cmp::min(
507
5
            data_start.saturating_add(self.read_chunk_size) - 1,
508
5
            data_end,
509
5
        );
510
511
        loop {
512
8
            let chunk: Bytes = client
513
8
                .getrange(encoded_key, chunk_start, chunk_end)
514
8
                .await
515
8
                .err_tip(|| 
"In RedisStore::get_part::getrange"0
)
?0
;
516
517
8
            let didnt_receive_full_chunk = chunk.len() < self.read_chunk_size;
518
8
            let reached_end_of_data = chunk_end == data_end;
519
8
520
8
            if didnt_receive_full_chunk || 
reached_end_of_data4
{
  Branch (520:16): [True: 4, False: 4]
  Branch (520:44): [True: 1, False: 3]
  Branch (520:16): [Folded - Ignored]
  Branch (520:44): [Folded - Ignored]
521
5
                if !chunk.is_empty() {
  Branch (521:20): [True: 4, False: 1]
  Branch (521:20): [Folded - Ignored]
522
4
                    writer
523
4
                        .send(chunk)
524
4
                        .await
525
4
                        .err_tip(|| 
"Failed to write data in RedisStore::get_part"0
)
?0
;
526
1
                }
527
528
5
                break; // No more data to read.
529
3
            }
530
3
531
3
            // We received a full chunk's worth of data, so write it...
532
3
            writer
533
3
                .send(chunk)
534
3
                .await
535
3
                .err_tip(|| 
"Failed to write data in RedisStore::get_part"0
)
?0
;
536
537
            // ...and go grab the next chunk.
538
3
            chunk_start = chunk_end + 1;
539
3
            chunk_end = cmp::min(
540
3
                chunk_start.saturating_add(self.read_chunk_size) - 1,
541
3
                data_end,
542
3
            );
543
        }
544
545
        // If we didn't write any data, check if the key exists, if not return a NotFound error.
546
        // This is required by spec.
547
5
        if writer.get_bytes_written() == 0 {
  Branch (547:12): [True: 1, False: 4]
  Branch (547:12): [Folded - Ignored]
548
            // We're supposed to read 0 bytes, so just check if the key exists.
549
1
            let exists = client
550
1
                .exists::<bool, _>(encoded_key)
551
1
                .await
552
1
                .err_tip(|| 
"In RedisStore::get_part::zero_exists"0
)
?0
;
553
554
1
            if !exists {
  Branch (554:16): [True: 1, False: 0]
  Branch (554:16): [Folded - Ignored]
555
1
                return Err(make_err!(
556
1
                    Code::NotFound,
557
1
                    "Data not found in Redis store for digest: {key:?}"
558
1
                ));
559
0
            }
560
4
        }
561
562
4
        writer
563
4
            .send_eof()
564
4
            .err_tip(|| 
"Failed to write EOF in redis store get_part"0
)
565
10
    }
566
567
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
568
0
        self
569
0
    }
570
571
0
    fn as_any(&self) -> &(dyn std::any::Any + Sync + Send) {
572
0
        self
573
0
    }
574
575
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send> {
576
0
        self
577
0
    }
578
579
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
580
0
        registry.register_indicator(self);
581
0
    }
582
}
583
584
#[async_trait]
585
impl HealthStatusIndicator for RedisStore {
586
0
    fn get_name(&self) -> &'static str {
587
0
        "RedisStore"
588
0
    }
589
590
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
591
0
        StoreDriver::check_health(Pin::new(self), namespace).await
592
0
    }
593
}
594
595
// -------------------------------------------------------------------
596
// Below this line are specific to the redis scheduler implementation.
597
// -------------------------------------------------------------------
598
599
/// The maximum number of results to return per cursor.
600
const MAX_COUNT_PER_CURSOR: u64 = 256;
601
/// The time in milliseconds that a redis cursor can be idle before it is closed.
602
const CURSOR_IDLE_MS: u64 = 2_000;
603
/// The name of the field in the Redis hash that stores the data.
604
const DATA_FIELD_NAME: &str = "data";
605
/// The name of the field in the Redis hash that stores the version.
606
const VERSION_FIELD_NAME: &str = "version";
607
/// The time to live of indexes in seconds. After this time redis may delete the index.
608
const INDEX_TTL_S: u64 = 60 * 60 * 24; // 24 hours.
609
610
/// String of the `FT.CREATE` command used to create the index template. It is done this
611
/// way so we can use it in both const (compile time) functions and runtime functions.
612
/// This is a macro because we need to use it in multiple places that sometimes require the
613
/// data as different data types (specifically for rust's `format_args!` macro).
614
macro_rules! get_create_index_template {
615
    () => {
616
        "FT.CREATE {} ON HASH PREFIX 1 {} NOOFFSETS NOHL NOFIELDS NOFREQS SCHEMA {} TAG CASESENSITIVE SORTABLE"
617
    }
618
}
619
620
/// Lua script to set a key if the version matches.
621
/// Args:
622
///   KEYS[1]: The key where the version is stored.
623
///   ARGV[1]: The expected version.
624
///   ARGV[2]: The new data.
625
///   ARGV[3*]: Key-value pairs of additional data to include.
626
/// Returns:
627
///   The new version if the version matches. nil is returned if the
628
///   value was not set.
629
const LUA_VERSION_SET_SCRIPT: &str = formatcp!(
630
    r"
631
local key = KEYS[1]
632
local expected_version = tonumber(ARGV[1])
633
local new_data = ARGV[2]
634
local new_version = redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', 1)
635
local i
636
local indexes = {{}}
637
638
if new_version-1 ~= expected_version then
639
    redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', -1)
640
    return 0
641
end
642
-- Skip first 2 argvs, as they are known inputs.
643
-- Remember: Lua is 1-indexed.
644
for i=3, #ARGV do
645
    indexes[i-2] = ARGV[i]
646
end
647
648
-- In testing we witnessed redis sometimes not update our FT indexes
649
-- resulting in stale data. It appears if we delete our keys then insert
650
-- them again it works and reduces risk significantly.
651
redis.call('DEL', key)
652
redis.call('HSET', key, '{DATA_FIELD_NAME}', new_data, '{VERSION_FIELD_NAME}', new_version, unpack(indexes))
653
654
return new_version
655
"
656
);
657
658
/// Compile-time fingerprint of the `FT.CREATE` command used to create the index template.
659
/// This is a simple CRC32 checksum of the command string. We don't care about it actually
660
/// being a valid CRC32 checksum, just that it's a unique identifier with a low chance of
661
/// collision.
662
13
const fn fingerprint_create_index_template() -> u32 {
663
    const POLY: u32 = 0xEDB8_8320;
664
    const DATA: &[u8] = get_create_index_template!().as_bytes();
665
13
    let mut crc = 0xFFFF_FFFF;
666
13
    let mut i = 0;
667
1.32k
    while i < DATA.len() {
  Branch (667:11): [True: 1.31k, False: 13]
  Branch (667:11): [Folded - Ignored]
668
1.31k
        let byte = DATA[i];
669
1.31k
        crc ^= byte as u32;
670
1.31k
671
1.31k
        let mut j = 0;
672
11.8k
        while j < 8 {
  Branch (672:15): [True: 10.5k, False: 1.31k]
  Branch (672:15): [Folded - Ignored]
673
10.5k
            crc = if crc & 1 != 0 {
  Branch (673:22): [True: 5.01k, False: 5.48k]
  Branch (673:22): [Folded - Ignored]
674
5.01k
                (crc >> 1) ^ POLY
675
            } else {
676
5.48k
                crc >> 1
677
            };
678
10.5k
            j += 1;
679
        }
680
1.31k
        i += 1;
681
    }
682
13
    crc
683
13
}
684
685
/// Get the name of the index to create for the given field.
686
/// This will add some prefix data to the name to try and ensure
687
/// if the index definition changes, the name will get a new name.
688
macro_rules! get_index_name {
689
    ($prefix:expr, $field:expr, $maybe_sort:expr) => {
690
        format_args!(
691
            "{}_{}_{}_{:08x}",
692
            $prefix,
693
            $field,
694
            $maybe_sort.unwrap_or(""),
695
            fingerprint_create_index_template(),
696
        )
697
    };
698
}
699
700
/// Try to sanitize a string to be used as a Redis key.
701
/// We don't actually modify the string, just check if it's valid.
702
2
const fn try_sanitize(s: &str) -> Option<&str> {
703
2
    // Note: We cannot use for loops or iterators here because they are not const.
704
2
    // Allowing us to use a const function here gives the compiler the ability to
705
2
    // optimize this function away entirely in the case where the input is constant.
706
2
    let chars = s.as_bytes();
707
2
    let mut i: usize = 0;
708
2
    let len = s.len();
709
    loop {
710
180
        if i >= len {
  Branch (710:12): [True: 2, False: 178]
  Branch (710:12): [Folded - Ignored]
711
2
            break;
712
178
        }
713
178
        let c = chars[i];
714
178
        if !c.is_ascii_alphanumeric() && 
c != b'_'10
{
  Branch (714:12): [True: 10, False: 168]
  Branch (714:42): [True: 0, False: 10]
  Branch (714:12): [Folded - Ignored]
  Branch (714:42): [Folded - Ignored]
715
0
            return None;
716
178
        }
717
178
        i += 1;
718
    }
719
2
    Some(s)
720
2
}
721
722
/// An individual subscription to a key in Redis.
723
pub struct RedisSubscription {
724
    receiver: Option<tokio::sync::watch::Receiver<String>>,
725
    weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
726
}
727
728
impl SchedulerSubscription for RedisSubscription {
729
    /// Wait for the subscription key to change.
730
7
    async fn changed(&mut self) -> Result<(), Error> {
731
7
        let receiver = self
732
7
            .receiver
733
7
            .as_mut()
734
7
            .ok_or_else(|| 
make_err!(Code::Internal, "In RedisSubscription::changed::as_mut")0
)
?0
;
735
7
        receiver
736
7
            .changed()
737
7
            .await
738
6
            .map_err(|_| 
make_err!(Code::Internal, "In RedisSubscription::changed::changed")0
)
739
6
    }
740
}
741
742
// If the subscription is dropped, we need to possibly remove the key from the
743
// subscribed keys map.
744
impl Drop for RedisSubscription {
745
2
    fn drop(&mut self) {
746
2
        let Some(receiver) = self.receiver.take() else {
  Branch (746:13): [True: 2, False: 0]
  Branch (746:13): [Folded - Ignored]
747
0
            event!(
748
0
                Level::WARN,
749
0
                "RedisSubscription has already been dropped, nothing to do."
750
            );
751
0
            return; // Already dropped, nothing to do.
752
        };
753
2
        let key = receiver.borrow().clone();
754
2
        // IMPORTANT: This must be dropped before receiver_count() is called.
755
2
        drop(receiver);
756
2
        let Some(subscribed_keys) = self.weak_subscribed_keys.upgrade() else {
  Branch (756:13): [True: 2, False: 0]
  Branch (756:13): [Folded - Ignored]
757
0
            return; // Already dropped, nothing to do.
758
        };
759
2
        let mut subscribed_keys = subscribed_keys.write();
760
2
        let Some(value) = subscribed_keys.get(&key) else {
  Branch (760:13): [True: 2, False: 0]
  Branch (760:13): [Folded - Ignored]
761
0
            event!(
762
0
                Level::ERROR,
763
0
                "Key {key} was not found in subscribed keys when checking if it should be removed."
764
            );
765
0
            return;
766
        };
767
        // If we have no receivers, cleanup the entry from our map.
768
2
        if value.receiver_count() == 0 {
  Branch (768:12): [True: 2, False: 0]
  Branch (768:12): [Folded - Ignored]
769
2
            subscribed_keys.remove(key);
770
2
        
}0
771
2
    }
772
}
773
774
/// A publisher for a key in Redis.
775
struct RedisSubscriptionPublisher {
776
    sender: Mutex<tokio::sync::watch::Sender<String>>,
777
}
778
779
impl RedisSubscriptionPublisher {
780
2
    fn new(
781
2
        key: String,
782
2
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
783
2
    ) -> (Self, RedisSubscription) {
784
2
        let (sender, receiver) = tokio::sync::watch::channel(key);
785
2
        let publisher = Self {
786
2
            sender: Mutex::new(sender),
787
2
        };
788
2
        let subscription = RedisSubscription {
789
2
            receiver: Some(receiver),
790
2
            weak_subscribed_keys,
791
2
        };
792
2
        (publisher, subscription)
793
2
    }
794
795
0
    fn subscribe(
796
0
        &self,
797
0
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
798
0
    ) -> RedisSubscription {
799
0
        let receiver = self.sender.lock().subscribe();
800
0
        RedisSubscription {
801
0
            receiver: Some(receiver),
802
0
            weak_subscribed_keys,
803
0
        }
804
0
    }
805
806
2
    fn receiver_count(&self) -> usize {
807
2
        self.sender.lock().receiver_count()
808
2
    }
809
810
4
    fn notify(&self) {
811
4
        // TODO(https://github.com/sile/patricia_tree/issues/40) When this is addressed
812
4
        // we can remove the `Mutex` and use the mutable iterator directly.
813
4
        self.sender.lock().send_modify(|_| {});
814
4
    }
815
}
816
817
pub struct RedisSubscriptionManager {
818
    subscribed_keys: Arc<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
819
    tx_for_test: tokio::sync::mpsc::UnboundedSender<String>,
820
    _subscription_spawn: JoinHandleDropGuard<()>,
821
}
822
823
impl RedisSubscriptionManager {
824
1
    pub fn new(subscribe_client: SubscriberClient, pub_sub_channel: String) -> Self {
825
1
        let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new()));
826
1
        let subscribed_keys_weak = Arc::downgrade(&subscribed_keys);
827
1
        let (tx_for_test, mut rx_for_test) = tokio::sync::mpsc::unbounded_channel();
828
1
        Self {
829
1
            subscribed_keys,
830
1
            tx_for_test,
831
1
            _subscription_spawn: spawn!("redis_subscribe_spawn", async move {
832
1
                let mut rx = subscribe_client.message_rx();
833
                loop {
834
1
                    if let Err(
e0
) = subscribe_client.subscribe(&pub_sub_channel).await {
  Branch (834:28): [True: 0, False: 1]
  Branch (834:28): [Folded - Ignored]
835
0
                        event!(Level::ERROR, "Error subscribing to pattern - {e}");
836
0
                        return;
837
1
                    }
838
1
                    let mut reconnect_rx = subscribe_client.reconnect_rx();
839
1
                    let reconnect_fut = reconnect_rx.recv().fuse();
840
1
                    tokio::pin!(reconnect_fut);
841
                    loop {
842
4
                        let 
key3
= select! {
843
4
                            
value3
= rx_for_test.recv() => {
844
3
                                let Some(value) = value else {
  Branch (844:37): [True: 3, False: 0]
  Branch (844:37): [Folded - Ignored]
845
0
                                    unreachable!("Channel should never close");
846
                                };
847
3
                                value.into()
848
                            },
849
4
                            
msg0
= rx.recv() => {
850
0
                                match msg {
851
0
                                    Ok(msg) => {
852
0
                                        if let RedisValue::String(s) = msg.value {
  Branch (852:48): [True: 0, False: 0]
  Branch (852:48): [Folded - Ignored]
853
0
                                            s
854
                                        } else {
855
0
                                            event!(Level::ERROR, "Received non-string message in RedisSubscriptionManager");
856
0
                                            continue;
857
                                        }
858
                                    },
859
0
                                    Err(e) => {
860
0
                                        // Check to see if our parent has been dropped and if so kill spawn.
861
0
                                        if subscribed_keys_weak.upgrade().is_none() {
  Branch (861:44): [True: 0, False: 0]
  Branch (861:44): [Folded - Ignored]
862
0
                                            event!(Level::WARN, "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn");
863
0
                                            return;
864
0
                                        };
865
0
                                        event!(Level::ERROR, "Error receiving message in RedisSubscriptionManager reconnecting and flagging everything changed - {e}");
866
0
                                        break;
867
                                    }
868
                                }
869
                            },
870
4
                            _ = &mut reconnect_fut => {
871
0
                                event!(Level::WARN, "Redis reconnected flagging all subscriptions as changed and resuming");
872
0
                                break;
873
                            }
874
                        };
875
3
                        let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
  Branch (875:29): [True: 3, False: 0]
  Branch (875:29): [Folded - Ignored]
876
0
                            event!(Level::WARN, "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn");
877
0
                            return;
878
                        };
879
3
                        let subscribed_keys_mux = subscribed_keys.read();
880
3
                        subscribed_keys_mux
881
3
                            .common_prefix_values(&*key)
882
3
                            .for_each(RedisSubscriptionPublisher::notify);
883
                    }
884
                    // Sleep for a small amount of time to ensure we don't reconnect too quickly.
885
0
                    sleep(Duration::from_secs(1)).await;
886
                    // If we reconnect or lag behind we might have had dirty keys, so we need to
887
                    // flag all of them as changed.
888
0
                    let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
  Branch (888:25): [True: 0, False: 0]
  Branch (888:25): [Folded - Ignored]
889
0
                        event!(Level::WARN, "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn");
890
0
                        return;
891
                    };
892
0
                    let subscribed_keys_mux = subscribed_keys.read();
893
0
                    // Just in case also get a new receiver.
894
0
                    rx = subscribe_client.message_rx();
895
0
                    // Drop all buffered messages, then flag everything as changed.
896
0
                    rx.resubscribe();
897
0
                    for publisher in subscribed_keys_mux.values() {
898
0
                        publisher.notify();
899
0
                    }
900
                }
901
1
            
}0
),
902
        }
903
1
    }
904
}
905
906
impl SchedulerSubscriptionManager for RedisSubscriptionManager {
907
    type Subscription = RedisSubscription;
908
909
3
    fn notify_for_test(&self, value: String) {
910
3
        self.tx_for_test.send(value).unwrap();
911
3
    }
912
913
2
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
914
2
    where
915
2
        K: SchedulerStoreKeyProvider,
916
2
    {
917
2
        let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys);
918
2
        let mut subscribed_keys = self.subscribed_keys.write();
919
2
        let key = key.get_key();
920
2
        let key_str = key.as_str();
921
2
        let mut subscription = if let Some(
publisher0
) = subscribed_keys.get(&key_str) {
  Branch (921:39): [True: 0, False: 0]
  Branch (921:39): [Folded - Ignored]
  Branch (921:39): [Folded - Ignored]
  Branch (921:39): [True: 0, False: 2]
922
0
            publisher.subscribe(weak_subscribed_keys)
923
        } else {
924
2
            let (publisher, subscription) =
925
2
                RedisSubscriptionPublisher::new(key_str.to_string(), weak_subscribed_keys);
926
2
            subscribed_keys.insert(key_str, publisher);
927
2
            subscription
928
        };
929
2
        subscription
930
2
            .receiver
931
2
            .as_mut()
932
2
            .ok_or_else(|| {
933
0
                make_err!(
934
0
                    Code::Internal,
935
0
                    "Receiver should be set in RedisSubscriptionManager::subscribe"
936
0
                )
937
2
            })
?0
938
2
            .mark_changed();
939
2
940
2
        Ok(subscription)
941
2
    }
942
}
943
944
impl SchedulerStore for RedisStore {
945
    type SubscriptionManager = RedisSubscriptionManager;
946
947
3
    fn subscription_manager(&self) -> Result<Arc<RedisSubscriptionManager>, Error> {
948
3
        let mut subscription_manager = self.subscription_manager.lock();
949
950
3
        if let Some(
subscription_manager2
) = &*subscription_manager {
  Branch (950:16): [True: 2, False: 1]
  Branch (950:16): [Folded - Ignored]
951
2
            Ok(subscription_manager.clone())
952
        } else {
953
1
            let Some(pub_sub_channel) = &self.pub_sub_channel else {
  Branch (953:17): [True: 1, False: 0]
  Branch (953:17): [Folded - Ignored]
954
0
                return Err(make_input_err!("RedisStore must have a pubsub channel for a Redis Scheduler if using subscriptions"));
955
            };
956
1
            let sub = Arc::new(RedisSubscriptionManager::new(
957
1
                self.subscriber_client.clone(),
958
1
                pub_sub_channel.clone(),
959
1
            ));
960
1
            *subscription_manager = Some(sub.clone());
961
1
            Ok(sub)
962
        }
963
3
    }
964
965
3
    async fn update_data<T>(&self, data: T) -> Result<Option<u64>, Error>
966
3
    where
967
3
        T: SchedulerStoreDataProvider
968
3
            + SchedulerStoreKeyProvider
969
3
            + SchedulerCurrentVersionProvider
970
3
            + Send,
971
3
    {
972
3
        let key = data.get_key();
973
3
        let key = self.encode_key(&key);
974
3
        let client = self.client_pool.next();
975
3
        let maybe_index = data.get_indexes().err_tip(|| {
976
0
            format!("Err getting index in RedisStore::update_data::versioned for {key:?}")
977
3
        })
?0
;
978
3
        if <T as SchedulerStoreKeyProvider>::Versioned::VALUE {
  Branch (978:12): [True: 0, False: 0]
  Branch (978:12): [True: 0, False: 0]
  Branch (978:12): [Folded - Ignored]
  Branch (978:12): [Folded - Ignored]
  Branch (978:12): [True: 0, False: 1]
  Branch (978:12): [True: 2, False: 0]
979
2
            let current_version = data.current_version();
980
2
            let data = data.try_into_bytes().err_tip(|| {
981
0
                format!("Could not convert value to bytes in RedisStore::update_data::versioned for {key:?}")
982
2
            })
?0
;
983
2
            let mut argv = Vec::with_capacity(3 + maybe_index.len() * 2);
984
2
            argv.push(Bytes::from(format!("{current_version}")));
985
2
            argv.push(data);
986
8
            for (
name, value6
) in maybe_index {
987
6
                argv.push(Bytes::from_static(name.as_bytes()));
988
6
                argv.push(value);
989
6
            }
990
2
            let new_version = self
991
2
                .update_if_version_matches_script
992
2
                .evalsha_with_reload::<u64, _, Vec<Bytes>>(client, vec![key.as_ref()], argv)
993
2
                .await
994
2
                .err_tip(|| 
format!("In RedisStore::update_data::versioned for {key:?}")0
)
?0
;
995
2
            if new_version == 0 {
  Branch (995:16): [True: 0, False: 0]
  Branch (995:16): [True: 0, False: 0]
  Branch (995:16): [Folded - Ignored]
  Branch (995:16): [Folded - Ignored]
  Branch (995:16): [True: 0, False: 0]
  Branch (995:16): [True: 0, False: 2]
996
0
                return Ok(None);
997
2
            }
998
            // If we have a publish channel configured, send a notice that the key has been set.
999
2
            if let Some(pub_sub_channel) = &self.pub_sub_channel {
  Branch (999:20): [True: 0, False: 0]
  Branch (999:20): [True: 0, False: 0]
  Branch (999:20): [Folded - Ignored]
  Branch (999:20): [Folded - Ignored]
  Branch (999:20): [True: 0, False: 0]
  Branch (999:20): [True: 2, False: 0]
1000
2
                return Ok(client.publish(pub_sub_channel, key.as_ref()).await
?0
);
1001
0
            };
1002
0
            Ok(Some(new_version))
1003
        } else {
1004
1
            let data = data.try_into_bytes().err_tip(|| {
1005
0
                format!("Could not convert value to bytes in RedisStore::update_data::noversion for {key:?}")
1006
1
            })
?0
;
1007
1
            let mut fields = RedisMap::new();
1008
1
            fields.reserve(1 + maybe_index.len());
1009
1
            fields.insert(DATA_FIELD_NAME.into(), data.into());
1010
1
            for (
name, value0
) in maybe_index {
1011
0
                fields.insert(name.into(), value.into());
1012
0
            }
1013
1
            client
1014
1
                .hset::<(), _, _>(key.as_ref(), fields)
1015
1
                .await
1016
1
                .err_tip(|| 
format!("In RedisStore::update_data::noversion for {key:?}")0
)
?0
;
1017
            // If we have a publish channel configured, send a notice that the key has been set.
1018
1
            if let Some(pub_sub_channel) = &self.pub_sub_channel {
  Branch (1018:20): [True: 0, False: 0]
  Branch (1018:20): [True: 0, False: 0]
  Branch (1018:20): [Folded - Ignored]
  Branch (1018:20): [Folded - Ignored]
  Branch (1018:20): [True: 1, False: 0]
  Branch (1018:20): [True: 0, False: 0]
1019
1
                return Ok(client.publish(pub_sub_channel, key.as_ref()).await
?0
);
1020
0
            };
1021
0
            Ok(Some(0)) // Always use "0" version since this is not a versioned request.
1022
        }
1023
3
    }
1024
1025
1
    async fn search_by_index_prefix<K>(
1026
1
        &self,
1027
1
        index: K,
1028
1
    ) -> Result<
1029
1
        impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
1030
1
        Error,
1031
1
    >
1032
1
    where
1033
1
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,
1034
1
    {
1035
1
        let index_value = index.index_value();
1036
2
        let run_ft_aggregate = || {
1037
2
            let client = self.client_pool.next().clone();
1038
2
            let sanitized_field = try_sanitize(index_value.as_ref()).err_tip(|| {
1039
0
                format!("In RedisStore::search_by_index_prefix::try_sanitize - {index_value:?}")
1040
2
            })
?0
;
1041
2
            Ok::<_, Error>(async move {
1042
2
                ft_aggregate(
1043
2
                    client,
1044
2
                    format!(
1045
2
                        "{}",
1046
2
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1047
2
                    ),
1048
2
                    format!("@{}:{{ {} }}", K::INDEX_NAME, sanitized_field),
1049
2
                    FtAggregateOptions {
1050
2
                        load: Some(Load::Some(vec![
1051
2
                            SearchField {
1052
2
                                identifier: DATA_FIELD_NAME.into(),
1053
2
                                property: None,
1054
2
                            },
1055
2
                            SearchField {
1056
2
                                identifier: VERSION_FIELD_NAME.into(),
1057
2
                                property: None,
1058
2
                            },
1059
2
                        ])),
1060
2
                        cursor: Some(WithCursor {
1061
2
                            count: Some(MAX_COUNT_PER_CURSOR),
1062
2
                            max_idle: Some(CURSOR_IDLE_MS),
1063
2
                        }),
1064
2
                        pipeline: vec![AggregateOperation::SortBy {
1065
2
                            properties: K::MAYBE_SORT_KEY.map_or_else(Vec::new, |v| {
1066
0
                                vec![(format!("@{v}").into(), SortOrder::Asc)]
1067
2
                            }),
1068
2
                            max: None,
1069
2
                        }],
1070
2
                        ..Default::default()
1071
2
                    },
1072
2
                )
1073
2
                .await
1074
2
            })
1075
2
        };
1076
1
        let stream = run_ft_aggregate()
?0
1077
1
            .or_else(|_| async move {
1078
1
                let mut schema = vec![SearchSchema {
1079
1
                    field_name: K::INDEX_NAME.into(),
1080
1
                    alias: None,
1081
1
                    kind: SearchSchemaKind::Tag {
1082
1
                        sortable: false,
1083
1
                        unf: false,
1084
1
                        separator: None,
1085
1
                        casesensitive: false,
1086
1
                        withsuffixtrie: false,
1087
1
                        noindex: false,
1088
1
                    },
1089
1
                }];
1090
1
                if let Some(
sort_key0
) = K::MAYBE_SORT_KEY {
  Branch (1090:24): [True: 0, False: 0]
  Branch (1090:24): [True: 0, False: 0]
  Branch (1090:24): [Folded - Ignored]
  Branch (1090:24): [Folded - Ignored]
  Branch (1090:24): [True: 0, False: 1]
1091
0
                    schema.push(SearchSchema {
1092
0
                        field_name: sort_key.into(),
1093
0
                        alias: None,
1094
0
                        kind: SearchSchemaKind::Tag {
1095
0
                            sortable: true,
1096
0
                            unf: false,
1097
0
                            separator: None,
1098
0
                            casesensitive: false,
1099
0
                            withsuffixtrie: false,
1100
0
                            noindex: false,
1101
0
                        },
1102
0
                    });
1103
1
                }
1104
1
                let create_result = self
1105
1
                    .client_pool
1106
1
                    .next()
1107
1
                    .ft_create::<(), _>(
1108
1
                        format!(
1109
1
                            "{}",
1110
1
                            get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1111
1
                        ),
1112
1
                        FtCreateOptions {
1113
1
                            on: Some(IndexKind::Hash),
1114
1
                            prefixes: vec![K::KEY_PREFIX.into()],
1115
1
                            nohl: true,
1116
1
                            nofields: true,
1117
1
                            nofreqs: true,
1118
1
                            nooffsets: true,
1119
1
                            temporary: Some(INDEX_TTL_S),
1120
1
                            ..Default::default()
1121
1
                        },
1122
1
                        schema,
1123
1
                    )
1124
1
                    .await
1125
1
                    .err_tip(|| {
1126
0
                        format!(
1127
0
                            "Error with ft_create in RedisStore::search_by_index_prefix({})",
1128
0
                            get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1129
0
                        )
1130
1
                    });
1131
1
                let run_result = run_ft_aggregate()
?0
.await.err_tip(|| {
1132
0
                    format!(
1133
0
                        "Error with second ft_aggregate in RedisStore::search_by_index_prefix({})",
1134
0
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1135
0
                    )
1136
1
                });
1137
1
                // Creating the index will race which is ok. If it fails to create, we only
1138
1
                // error if the second ft_aggregate call fails and fails to create.
1139
1
                run_result.or_else(move |e| 
create_result.merge(Err(e))0
)
1140
2
            })
1141
1
            .await
?0
;
1142
1
        Ok(stream.map(|result| 
{0
1143
0
            let mut redis_map =
1144
0
                result.err_tip(|| "Error in stream of in RedisStore::search_by_index_prefix")?;
1145
0
            let bytes_data = redis_map
1146
0
                .remove(&RedisKey::from_static_str(DATA_FIELD_NAME))
1147
0
                .err_tip(|| "Missing data field in RedisStore::search_by_index_prefix")?
1148
0
                .into_bytes()
1149
0
                .err_tip(|| {
1150
0
                    formatcp!("'{DATA_FIELD_NAME}' is not Bytes in RedisStore::search_by_index_prefix::into_bytes")
1151
0
                })?;
1152
0
            let version = if <K as SchedulerIndexProvider>::Versioned::VALUE {
  Branch (1152:30): [True: 0, False: 0]
  Branch (1152:30): [True: 0, False: 0]
  Branch (1152:30): [Folded - Ignored]
  Branch (1152:30): [Folded - Ignored]
  Branch (1152:30): [True: 0, False: 0]
1153
0
                redis_map
1154
0
                    .remove(&RedisKey::from_static_str(VERSION_FIELD_NAME))
1155
0
                    .err_tip(|| "Missing version field in RedisStore::search_by_index_prefix")?
1156
0
                    .as_u64()
1157
0
                    .err_tip(|| {
1158
0
                        formatcp!("'{VERSION_FIELD_NAME}' is not u64 in RedisStore::search_by_index_prefix::as_u64")
1159
0
                    })?
1160
            } else {
1161
0
                0
1162
            };
1163
0
            K::decode(version, bytes_data)
1164
0
                .err_tip(|| "In RedisStore::search_by_index_prefix::decode")
1165
1
        
}0
))
1166
1
    }
1167
1168
2
    async fn get_and_decode<K>(
1169
2
        &self,
1170
2
        key: K,
1171
2
    ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>
1172
2
    where
1173
2
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send,
1174
2
    {
1175
2
        let key = key.get_key();
1176
2
        let key = self.encode_key(&key);
1177
2
        let client = self.client_pool.next();
1178
2
        let (maybe_version, maybe_data) = client
1179
2
            .hmget::<(Option<u64>, Option<Bytes>), _, _>(
1180
2
                key.as_ref(),
1181
2
                vec![
1182
2
                    RedisKey::from(VERSION_FIELD_NAME),
1183
2
                    RedisKey::from(DATA_FIELD_NAME),
1184
2
                ],
1185
2
            )
1186
2
            .await
1187
2
            .err_tip(|| 
format!("In RedisStore::get_without_version::notversioned {key}")0
)
?0
;
1188
2
        let Some(data) = maybe_data else {
  Branch (1188:13): [True: 0, False: 0]
  Branch (1188:13): [True: 0, False: 0]
  Branch (1188:13): [Folded - Ignored]
  Branch (1188:13): [Folded - Ignored]
  Branch (1188:13): [True: 2, False: 0]
1189
0
            return Ok(None);
1190
        };
1191
2
        Ok(Some(K::decode(maybe_version.unwrap_or(0), data).err_tip(
1192
2
            || 
format!("In RedisStore::get_with_version::notversioned::decode {key}")0
,
1193
2
        )
?0
))
1194
2
    }
1195
}