Coverage Report

Created: 2026-02-23 10:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/redis_store.rs
Line
Count
Source
1
// Copyright 2024-2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::fmt::Debug;
17
use core::ops::{Bound, RangeBounds};
18
use core::pin::Pin;
19
use core::str::FromStr;
20
use core::time::Duration;
21
use std::borrow::Cow;
22
use std::sync::{Arc, Weak};
23
use std::time::Instant;
24
25
use async_trait::async_trait;
26
use bytes::Bytes;
27
use const_format::formatcp;
28
use futures::stream::{self, FuturesUnordered};
29
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt, future};
30
use itertools::izip;
31
use nativelink_config::stores::{RedisMode, RedisSpec};
32
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
33
use nativelink_metric::MetricsComponent;
34
use nativelink_redis_tester::{MockPubSub, SubscriptionManagerNotify};
35
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
36
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
37
use nativelink_util::spawn;
38
use nativelink_util::store_trait::{
39
    BoolValue, RemoveItemCallback, SchedulerCurrentVersionProvider, SchedulerIndexProvider,
40
    SchedulerStore, SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,
41
    SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo,
42
};
43
use nativelink_util::task::JoinHandleDropGuard;
44
use parking_lot::{Mutex, RwLock};
45
use patricia_tree::StringPatriciaMap;
46
use redis::aio::{ConnectionLike, ConnectionManager, ConnectionManagerConfig, PubSub};
47
use redis::cluster::ClusterClient;
48
use redis::cluster_async::ClusterConnection;
49
use redis::sentinel::{SentinelClient, SentinelNodeConnectionInfo, SentinelServerType};
50
use redis::{
51
    AsyncCommands, AsyncIter, Client, IntoConnectionInfo, Msg, PushInfo, RedisResult, ScanOptions,
52
    Script, Value, pipe,
53
};
54
use tokio::select;
55
use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
56
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
57
use tokio::time::{sleep, timeout};
58
use tokio_stream::wrappers::UnboundedReceiverStream;
59
use tracing::{debug, error, info, trace, warn};
60
use url::Url;
61
use uuid::Uuid;
62
63
use crate::cas_utils::is_zero_digest;
64
use crate::redis_utils::{
65
    FtAggregateCursor, FtAggregateOptions, FtCreateOptions, SearchSchema, ft_aggregate, ft_create,
66
};
67
68
/// The default size of the read chunk when reading data from Redis.
69
/// Note: If this changes it should be updated in the config documentation.
70
const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024;
71
72
/// The default size of the connection pool if not specified.
73
/// Note: If this changes it should be updated in the config documentation.
74
const DEFAULT_CONNECTION_POOL_SIZE: usize = 3;
75
76
/// The default delay between retries if not specified.
77
/// Note: If this changes it should be updated in the config documentation.
78
const DEFAULT_RETRY_DELAY: f32 = 0.1;
79
80
/// The default connection timeout in milliseconds if not specified.
81
/// Note: If this changes it should be updated in the config documentation.
82
const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000;
83
84
/// The default command timeout in milliseconds if not specified.
85
/// Note: If this changes it should be updated in the config documentation.
86
const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000;
87
88
/// The default maximum number of chunk uploads per update.
89
/// Note: If this changes it should be updated in the config documentation.
90
pub const DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE: usize = 10;
91
92
/// The default COUNT value passed when scanning keys in Redis.
93
/// Note: If this changes it should be updated in the config documentation.
94
const DEFAULT_SCAN_COUNT: usize = 10_000;
95
96
/// The default COUNT value passed when scanning search indexes
97
/// Note: If this changes it should be updated in the config documentation.
98
pub const DEFAULT_MAX_COUNT_PER_CURSOR: u64 = 1_500;
99
100
const DEFAULT_CLIENT_PERMITS: usize = 500;
101
102
/// A [`StoreDriver`] implementation that uses Redis as a backing store.
103
#[derive(MetricsComponent)]
104
pub struct RedisStore<C, P>
105
where
106
    C: ConnectionLike + Clone,
107
    P: RedisPatternSubscriber,
108
{
109
    /// The client pool connecting to the backing Redis instance(s).
110
    connection_manager: C,
111
112
    /// A channel to publish updates to when a key is added, removed, or modified.
113
    #[metric(
114
        help = "The pubsub channel to publish updates to when a key is added, removed, or modified"
115
    )]
116
    pub_sub_channel: Option<String>,
117
118
    pub_sub: Mutex<Option<P>>,
119
120
    /// A function used to generate names for temporary keys.
121
    temp_name_generator_fn: fn() -> String,
122
123
    /// A common prefix to append to all keys before they are sent to Redis.
124
    ///
125
    /// See [`RedisStore::key_prefix`](`nativelink_config::stores::RedisStore::key_prefix`).
126
    #[metric(help = "Prefix to append to all keys before sending to Redis")]
127
    key_prefix: String,
128
129
    /// The amount of data to read from Redis at a time.
130
    #[metric(help = "The amount of data to read from Redis at a time")]
131
    read_chunk_size: usize,
132
133
    /// The maximum number of chunk uploads per update.
134
    /// This is used to limit the number of chunk uploads per update to prevent
135
    #[metric(help = "The maximum number of chunk uploads per update")]
136
    max_chunk_uploads_per_update: usize,
137
138
    /// The COUNT value passed when scanning keys in Redis.
139
    /// This is used to hint the amount of work that should be done per response.
140
    #[metric(help = "The COUNT value passed when scanning keys in Redis")]
141
    scan_count: usize,
142
143
    /// The COUNT value used with search indexes
144
    #[metric(help = "The maximum number of results to return per cursor")]
145
    max_count_per_cursor: u64,
146
147
    /// Redis script used to update a value in redis if the version matches.
148
    /// This is done by incrementing the version number and then setting the new data
149
    /// only if the version number matches the existing version number.
150
    update_if_version_matches_script: Script,
151
152
    /// A manager for subscriptions to keys in Redis.
153
    subscription_manager: Mutex<Option<Arc<RedisSubscriptionManager>>>,
154
155
    /// Channel for getting subscription messages. Only used by cluster mode where
156
    /// the sender is connected at construction time. For standard mode, this is
157
    /// None and created on demand in `subscription_manager()`.
158
    subscriber_channel: Mutex<Option<UnboundedReceiver<PushInfo>>>,
159
160
    /// Permits to limit inflight Redis requests. Technically only
161
    /// limits the calls to `get_client()`, but the requests per client
162
    /// are small enough that it works well enough.
163
    client_permits: Arc<Semaphore>,
164
}
165
166
impl<C: ConnectionLike + Clone, P: RedisPatternSubscriber> Debug for RedisStore<C, P> {
167
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
168
0
        f.debug_struct("RedisStore")
169
0
            .field("pub_sub_channel", &self.pub_sub_channel)
170
0
            .field("temp_name_generator_fn", &self.temp_name_generator_fn)
171
0
            .field("key_prefix", &self.key_prefix)
172
0
            .field("read_chunk_size", &self.read_chunk_size)
173
0
            .field(
174
0
                "max_chunk_uploads_per_update",
175
0
                &self.max_chunk_uploads_per_update,
176
0
            )
177
0
            .field("scan_count", &self.scan_count)
178
0
            .field(
179
0
                "update_if_version_matches_script",
180
0
                &self.update_if_version_matches_script,
181
0
            )
182
0
            .field("subscription_manager", &self.subscription_manager)
183
0
            .field("subscriber_channel", &self.subscriber_channel)
184
0
            .field("client_permits", &self.client_permits)
185
0
            .finish()
186
0
    }
187
}
188
189
struct ClientWithPermit<C: ConnectionLike> {
190
    connection_manager: C,
191
192
    // here so it sticks around with the client and doesn't get dropped until that does
193
    #[allow(dead_code)]
194
    semaphore_permit: OwnedSemaphorePermit,
195
}
196
197
impl<C: ConnectionLike> Drop for ClientWithPermit<C> {
198
136
    fn drop(&mut self) {
199
136
        trace!(
200
136
            remaining = self.semaphore_permit.semaphore().available_permits(),
201
136
            "Dropping a client permit"
202
        );
203
136
    }
204
}
205
206
impl<C: ConnectionLike + Clone + Sync, P: RedisPatternSubscriber> RedisStore<C, P> {
207
    /// Used for testing when determinism is required.
208
    #[expect(clippy::too_many_arguments)]
209
22
    pub async fn new_from_builder_and_parts(
210
22
        mut connection_manager: C,
211
22
        pub_sub_channel: Option<String>,
212
22
        pub_sub: Option<P>,
213
22
        temp_name_generator_fn: fn() -> String,
214
22
        key_prefix: String,
215
22
        read_chunk_size: usize,
216
22
        max_chunk_uploads_per_update: usize,
217
22
        scan_count: usize,
218
22
        max_client_permits: usize,
219
22
        max_count_per_cursor: u64,
220
22
        subscriber_channel: Option<UnboundedReceiver<PushInfo>>,
221
22
    ) -> Result<Self, Error> {
222
22
        info!("Redis index fingerprint: {FINGERPRINT_CREATE_INDEX_HEX}");
223
224
22
        let version_set_script = Script::new(LUA_VERSION_SET_SCRIPT);
225
22
        version_set_script
226
22
            .load_async(&mut connection_manager)
227
22
            .await
?0
;
228
229
22
        Ok(Self {
230
22
            connection_manager,
231
22
            pub_sub_channel,
232
22
            pub_sub: Mutex::new(pub_sub),
233
22
            temp_name_generator_fn,
234
22
            key_prefix,
235
22
            read_chunk_size,
236
22
            max_chunk_uploads_per_update,
237
22
            scan_count,
238
22
            update_if_version_matches_script: version_set_script,
239
22
            subscription_manager: Mutex::new(None),
240
22
            subscriber_channel: Mutex::new(subscriber_channel),
241
22
            client_permits: Arc::new(Semaphore::new(max_client_permits)),
242
22
            max_count_per_cursor,
243
22
        })
244
22
    }
245
246
136
    async fn get_client(&self) -> Result<ClientWithPermit<C>, Error> {
247
136
        let local_client_permits = self.client_permits.clone();
248
136
        let remaining = local_client_permits.available_permits();
249
136
        let semaphore_permit = local_client_permits.acquire_owned().await
?0
;
250
136
        trace!(remaining, "Got a client permit");
251
136
        Ok(ClientWithPermit {
252
136
            connection_manager: self.connection_manager.clone(),
253
136
            semaphore_permit,
254
136
        })
255
136
    }
256
257
    /// Encode a [`StoreKey`] so it can be sent to Redis.
258
130
    fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> {
259
130
        let key_body = key.as_str();
260
130
        if self.key_prefix.is_empty() {
  Branch (260:12): [True: 0, False: 0]
  Branch (260:12): [True: 0, False: 0]
  Branch (260:12): [True: 0, False: 0]
  Branch (260:12): [Folded - Ignored]
  Branch (260:12): [True: 113, False: 0]
  Branch (260:12): [True: 12, False: 4]
  Branch (260:12): [True: 1, False: 0]
261
126
            key_body
262
        } else {
263
            // This is in the hot path for all redis operations, so we try to reuse the allocation
264
            // from `key.as_str()` if possible.
265
4
            match key_body {
266
4
                Cow::Owned(mut encoded_key) => {
267
4
                    encoded_key.insert_str(0, &self.key_prefix);
268
4
                    Cow::Owned(encoded_key)
269
                }
270
0
                Cow::Borrowed(body) => {
271
0
                    let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len());
272
0
                    encoded_key.push_str(&self.key_prefix);
273
0
                    encoded_key.push_str(body);
274
0
                    Cow::Owned(encoded_key)
275
                }
276
            }
277
        }
278
130
    }
279
280
12
    fn set_spec_defaults(spec: &mut RedisSpec) -> Result<(), Error> {
281
12
        if spec.addresses.is_empty() {
  Branch (281:12): [True: 0, False: 0]
  Branch (281:12): [True: 0, False: 0]
  Branch (281:12): [Folded - Ignored]
  Branch (281:12): [True: 0, False: 3]
  Branch (281:12): [True: 0, False: 9]
282
0
            return Err(make_err!(
283
0
                Code::InvalidArgument,
284
0
                "No addresses were specified in redis store configuration."
285
0
            ));
286
12
        }
287
288
12
        if spec.broadcast_channel_capacity != 0 {
  Branch (288:12): [True: 0, False: 0]
  Branch (288:12): [True: 0, False: 0]
  Branch (288:12): [Folded - Ignored]
  Branch (288:12): [True: 0, False: 3]
  Branch (288:12): [True: 1, False: 8]
289
1
            warn!("broadcast_channel_capacity in Redis spec is deprecated and ignored");
290
11
        }
291
12
        if spec.response_timeout_s != 0 {
  Branch (291:12): [True: 0, False: 0]
  Branch (291:12): [True: 0, False: 0]
  Branch (291:12): [Folded - Ignored]
  Branch (291:12): [True: 0, False: 3]
  Branch (291:12): [True: 0, False: 9]
292
0
            warn!(
293
0
                "response_timeout_s in Redis spec is deprecated and ignored, use command_timeout_ms"
294
            );
295
12
        }
296
12
        if spec.connection_timeout_s != 0 {
  Branch (296:12): [True: 0, False: 0]
  Branch (296:12): [True: 0, False: 0]
  Branch (296:12): [Folded - Ignored]
  Branch (296:12): [True: 0, False: 3]
  Branch (296:12): [True: 0, False: 9]
297
0
            if spec.connection_timeout_ms != 0 {
  Branch (297:16): [True: 0, False: 0]
  Branch (297:16): [True: 0, False: 0]
  Branch (297:16): [Folded - Ignored]
  Branch (297:16): [True: 0, False: 0]
  Branch (297:16): [True: 0, False: 0]
298
0
                return Err(make_err!(
299
0
                    Code::InvalidArgument,
300
0
                    "Both connection_timeout_s and connection_timeout_ms were set, can only have one!"
301
0
                ));
302
0
            }
303
0
            warn!("connection_timeout_s in Redis spec is deprecated, use connection_timeout_ms");
304
0
            spec.connection_timeout_ms = spec.connection_timeout_s * 1000;
305
12
        }
306
12
        if spec.connection_timeout_ms == 0 {
  Branch (306:12): [True: 0, False: 0]
  Branch (306:12): [True: 0, False: 0]
  Branch (306:12): [Folded - Ignored]
  Branch (306:12): [True: 3, False: 0]
  Branch (306:12): [True: 4, False: 5]
307
7
            spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS;
308
7
        
}5
309
12
        if spec.command_timeout_ms == 0 {
  Branch (309:12): [True: 0, False: 0]
  Branch (309:12): [True: 0, False: 0]
  Branch (309:12): [Folded - Ignored]
  Branch (309:12): [True: 3, False: 0]
  Branch (309:12): [True: 7, False: 2]
310
10
            spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS;
311
10
        
}2
312
12
        if spec.connection_pool_size == 0 {
  Branch (312:12): [True: 0, False: 0]
  Branch (312:12): [True: 0, False: 0]
  Branch (312:12): [Folded - Ignored]
  Branch (312:12): [True: 3, False: 0]
  Branch (312:12): [True: 9, False: 0]
313
12
            spec.connection_pool_size = DEFAULT_CONNECTION_POOL_SIZE;
314
12
        
}0
315
12
        if spec.read_chunk_size == 0 {
  Branch (315:12): [True: 0, False: 0]
  Branch (315:12): [True: 0, False: 0]
  Branch (315:12): [Folded - Ignored]
  Branch (315:12): [True: 3, False: 0]
  Branch (315:12): [True: 9, False: 0]
316
12
            spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
317
12
        
}0
318
12
        if spec.max_count_per_cursor == 0 {
  Branch (318:12): [True: 0, False: 0]
  Branch (318:12): [True: 0, False: 0]
  Branch (318:12): [Folded - Ignored]
  Branch (318:12): [True: 3, False: 0]
  Branch (318:12): [True: 9, False: 0]
319
12
            spec.max_count_per_cursor = DEFAULT_MAX_COUNT_PER_CURSOR;
320
12
        
}0
321
12
        if spec.max_chunk_uploads_per_update == 0 {
  Branch (321:12): [True: 0, False: 0]
  Branch (321:12): [True: 0, False: 0]
  Branch (321:12): [Folded - Ignored]
  Branch (321:12): [True: 3, False: 0]
  Branch (321:12): [True: 9, False: 0]
322
12
            spec.max_chunk_uploads_per_update = DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE;
323
12
        
}0
324
12
        if spec.scan_count == 0 {
  Branch (324:12): [True: 0, False: 0]
  Branch (324:12): [True: 0, False: 0]
  Branch (324:12): [Folded - Ignored]
  Branch (324:12): [True: 3, False: 0]
  Branch (324:12): [True: 9, False: 0]
325
12
            spec.scan_count = DEFAULT_SCAN_COUNT;
326
12
        
}0
327
12
        if spec.max_client_permits == 0 {
  Branch (327:12): [True: 0, False: 0]
  Branch (327:12): [True: 0, False: 0]
  Branch (327:12): [Folded - Ignored]
  Branch (327:12): [True: 3, False: 0]
  Branch (327:12): [True: 9, False: 0]
328
12
            spec.max_client_permits = DEFAULT_CLIENT_PERMITS;
329
12
        
}0
330
12
        if spec.retry.delay == 0.0 {
  Branch (330:12): [True: 0, False: 0]
  Branch (330:12): [True: 0, False: 0]
  Branch (330:12): [Folded - Ignored]
  Branch (330:12): [True: 3, False: 0]
  Branch (330:12): [True: 9, False: 0]
331
12
            spec.retry.delay = DEFAULT_RETRY_DELAY;
332
12
        
}0
333
12
        if spec.retry.max_retries == 0 {
  Branch (333:12): [True: 0, False: 0]
  Branch (333:12): [True: 0, False: 0]
  Branch (333:12): [Folded - Ignored]
  Branch (333:12): [True: 3, False: 0]
  Branch (333:12): [True: 9, False: 0]
334
12
            spec.retry.max_retries = 1;
335
12
        
}0
336
12
        trace!(?spec, "redis spec is after setting defaults");
337
12
        Ok(())
338
12
    }
339
}
340
341
impl RedisStore<ClusterConnection, PubSub> {
342
0
    pub async fn new_cluster(mut spec: RedisSpec) -> Result<Arc<Self>, Error> {
343
0
        if spec.mode != RedisMode::Cluster {
  Branch (343:12): [True: 0, False: 0]
  Branch (343:12): [Folded - Ignored]
344
0
            return Err(Error::new(
345
0
                Code::InvalidArgument,
346
0
                "new_cluster only works for Cluster mode".to_string(),
347
0
            ));
348
0
        }
349
0
        Self::set_spec_defaults(&mut spec)?;
350
351
0
        let parsed_addrs: Vec<_> = spec
352
0
            .addresses
353
0
            .iter_mut()
354
0
            .map(|addr| {
355
0
                addr.clone().into_connection_info().map(|connection_info| {
356
0
                    let redis_settings = connection_info
357
0
                        .redis_settings()
358
0
                        .clone()
359
                        // We need RESP3 here because the cluster mode doesn't support RESP2 pubsub
360
                        // See also https://docs.rs/redis/latest/redis/cluster_async/index.html#pubsub
361
0
                        .set_protocol(redis::ProtocolVersion::RESP3);
362
0
                    connection_info.set_redis_settings(redis_settings)
363
0
                })
364
0
            })
365
0
            .collect::<Result<Vec<_>, _>>()?;
366
367
0
        let connection_timeout = Duration::from_millis(spec.connection_timeout_ms);
368
0
        let command_timeout = Duration::from_millis(spec.command_timeout_ms);
369
0
        let (tx, subscriber_channel) = unbounded_channel();
370
371
0
        let builder = ClusterClient::builder(parsed_addrs)
372
0
            .connection_timeout(connection_timeout)
373
0
            .response_timeout(command_timeout)
374
0
            .push_sender(tx)
375
0
            .retries(u32::try_from(spec.retry.max_retries)?);
376
377
0
        let client = builder.build()?;
378
379
0
        Self::new_from_builder_and_parts(
380
0
            client.get_async_connection().await?,
381
0
            spec.experimental_pub_sub_channel.clone(),
382
0
            None,
383
0
            || Uuid::new_v4().to_string(),
384
0
            spec.key_prefix.clone(),
385
0
            spec.read_chunk_size,
386
0
            spec.max_chunk_uploads_per_update,
387
0
            spec.scan_count,
388
0
            spec.max_client_permits,
389
0
            spec.max_count_per_cursor,
390
0
            Some(subscriber_channel),
391
        )
392
0
        .await
393
0
        .map(Arc::new)
394
0
    }
395
}
396
397
impl RedisStore<ConnectionManager, PubSub> {
398
    /// Create a new `RedisStore` from the given configuration.
399
12
    pub async fn new_standard(mut spec: RedisSpec) -> Result<Arc<Self>, Error> {
400
12
        Self::set_spec_defaults(&mut spec)
?0
;
401
402
12
        let addr = spec.addresses.remove(0);
403
12
        if !spec.addresses.is_empty() {
  Branch (403:12): [True: 0, False: 0]
  Branch (403:12): [Folded - Ignored]
  Branch (403:12): [True: 0, False: 3]
  Branch (403:12): [True: 0, False: 9]
404
0
            return Err(make_err!(
405
0
                Code::Unimplemented,
406
0
                "Connecting directly to multiple redis nodes in a cluster is currently unsupported. Please specify a single URL to a single node, and nativelink will use cluster discover to find the other nodes."
407
0
            ));
408
12
        }
409
410
12
        let connection_timeout = Duration::from_millis(spec.connection_timeout_ms);
411
12
        let command_timeout = Duration::from_millis(spec.command_timeout_ms);
412
413
12
        let local_addr = addr.clone();
414
12
        let mut parsed_addr = local_addr
415
12
            .replace("redis+sentinel://", "redis://")
416
12
            .into_connection_info()
?0
;
417
12
        debug!(?parsed_addr, "Parsed redis addr");
418
419
12
        let 
client11
= timeout(
420
12
            connection_timeout,
421
12
            spawn!("connect", async move {
422
12
                match spec.mode {
423
8
                    RedisMode::Standard => Client::open(parsed_addr).map_err(Into::<Error>::into),
424
                    RedisMode::Cluster => {
425
0
                        return Err(Error::new(
426
0
                            Code::Internal,
427
0
                            "Use RedisStore::new_cluster for cluster connections".to_owned(),
428
0
                        ));
429
                    }
430
4
                    RedisMode::Sentinel => async {
431
4
                        let url_parsing = Url::parse(&local_addr)
?0
;
432
4
                        let master_name = url_parsing
433
4
                            .query_pairs()
434
4
                            .find(|(key, _)| 
key1
==
"sentinelServiceName"1
)
435
4
                            .map_or_else(|| 
"master"3
.
into3
(), |(_, value)|
value1
.
to_string1
());
436
437
4
                        let redis_connection_info = parsed_addr.redis_settings().clone();
438
4
                        let sentinel_connection_info = SentinelNodeConnectionInfo::default()
439
4
                            .set_redis_connection_info(redis_connection_info);
440
441
                        // We fish this out because sentinels don't support db, we need to set it
442
                        // on the client only. See also https://github.com/redis-rs/redis-rs/issues/1950
443
4
                        let original_db = parsed_addr.redis_settings().db();
444
4
                        if original_db != 0 {
  Branch (444:28): [True: 0, False: 0]
  Branch (444:28): [Folded - Ignored]
  Branch (444:28): [True: 0, False: 0]
  Branch (444:28): [True: 1, False: 3]
445
1
                            // sentinel_connection_info has the actual DB set
446
1
                            let revised_settings = parsed_addr.redis_settings().clone().set_db(0);
447
1
                            parsed_addr = parsed_addr.set_redis_settings(revised_settings);
448
3
                        }
449
450
4
                        SentinelClient::build(
451
4
                            vec![parsed_addr],
452
4
                            master_name,
453
4
                            Some(sentinel_connection_info),
454
4
                            SentinelServerType::Master,
455
                        )
456
4
                        .map_err(Into::<Error>::into)
457
4
                    }
458
8
                    .
and_then4
(|mut s| async move
{4
Ok(
s4
.async_get_client().await) })
459
4
                    .await
?0
460
4
                    .map_err(Into::<Error>::into),
461
                }
462
12
                .err_tip_with_code(|_e| 
{1
463
1
                    (
464
1
                        Code::InvalidArgument,
465
1
                        format!("While connecting to redis with url: {local_addr}"),
466
1
                    )
467
1
                })
468
12
            }),
469
        )
470
12
        .await
471
12
        .err_tip(|| format!(
"Timeout while connecting to redis with url: {addr}"0
))
?0
?0
?1
;
472
473
11
        let connection_manager_config = {
474
11
            ConnectionManagerConfig::new()
475
11
                .set_number_of_retries(spec.retry.max_retries)
476
11
                .set_connection_timeout(Some(connection_timeout))
477
11
                .set_response_timeout(Some(command_timeout))
478
        };
479
480
11
        let err_addr = addr.clone();
481
11
        let 
pub_sub9
= timeout(connection_timeout, async {
482
11
            client.get_async_pubsub().await
483
9
        })
484
11
        .await
485
11
        .err_tip(|| format!(
"While connecting to redis with url: {err_addr}"2
))
?2
?0
;
486
487
9
        let connection_manager: ConnectionManager =
488
9
            ConnectionManager::new_with_config(client, connection_manager_config)
489
9
                .await
490
9
                .err_tip(|| format!(
"While connecting to redis with url: {addr}"0
))
?0
;
491
492
9
        Self::new_from_builder_and_parts(
493
9
            connection_manager,
494
9
            spec.experimental_pub_sub_channel.clone(),
495
9
            Some(pub_sub),
496
1
            || Uuid::new_v4().to_string(),
497
9
            spec.key_prefix.clone(),
498
9
            spec.read_chunk_size,
499
9
            spec.max_chunk_uploads_per_update,
500
9
            spec.scan_count,
501
9
            spec.max_client_permits,
502
9
            spec.max_count_per_cursor,
503
9
            None, // Standard mode creates subscription channel on demand
504
        )
505
9
        .await
506
9
        .map(Arc::new)
507
12
    }
508
}
509
510
#[async_trait]
511
impl<C: ConnectionLike + Clone + 'static + Send + Sync + Unpin, P: RedisPatternSubscriber + Unpin>
512
    StoreDriver for RedisStore<C, P>
513
{
514
    async fn has_with_results(
515
        self: Pin<&Self>,
516
        keys: &[StoreKey<'_>],
517
        results: &mut [Option<u64>],
518
6
    ) -> Result<(), Error> {
519
        // TODO(palfrey) We could use pipeline here, but it makes retry more
520
        // difficult and it doesn't work very well in cluster mode.
521
        // If we wanted to optimize this with pipeline be careful to
522
        // implement retry and to support cluster mode.
523
524
        izip!(keys.iter(), results.iter_mut(),)
525
6
            .map(|(key, result)| async move {
526
                // We need to do a special pass to ensure our zero key exist.
527
6
                if is_zero_digest(key.borrow()) {
  Branch (527:20): [True: 0, False: 0]
  Branch (527:20): [True: 0, False: 0]
  Branch (527:20): [Folded - Ignored]
  Branch (527:20): [True: 2, False: 4]
  Branch (527:20): [True: 0, False: 0]
528
2
                    *result = Some(0);
529
2
                    return Ok::<_, Error>(());
530
4
                }
531
4
                let encoded_key = self.encode_key(key);
532
533
4
                let mut client = self.get_client().await
?0
;
534
535
                // Redis returns 0 when the key doesn't exist
536
                // AND when the key exists with value of length 0.
537
                // Therefore, we need to check both length and existence
538
                // and do it in a pipeline for efficiency
539
4
                let (blob_len, exists) = pipe()
540
4
                    .strlen(encoded_key.as_ref())
541
4
                    .exists(encoded_key.as_ref())
542
4
                    .query_async::<(u64, bool)>(&mut client.connection_manager)
543
4
                    .await
544
4
                    .err_tip(|| "In RedisStore::has_with_results::all")
?0
;
545
546
4
                *result = if exists { Some(blob_len) } else { 
None0
};
  Branch (546:30): [True: 0, False: 0]
  Branch (546:30): [True: 0, False: 0]
  Branch (546:30): [Folded - Ignored]
  Branch (546:30): [True: 4, False: 0]
  Branch (546:30): [True: 0, False: 0]
547
548
4
                Ok::<_, Error>(())
549
12
            })
550
            .collect::<FuturesUnordered<_>>()
551
            .try_collect()
552
            .await
553
6
    }
554
555
    async fn list(
556
        self: Pin<&Self>,
557
        range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
558
        handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
559
8
    ) -> Result<u64, Error> {
560
        let range = (
561
            range.0.map(StoreKey::into_owned),
562
            range.1.map(StoreKey::into_owned),
563
        );
564
        let pattern = match range.0 {
565
            Bound::Included(ref start) | Bound::Excluded(ref start) => match range.1 {
566
                Bound::Included(ref end) | Bound::Excluded(ref end) => {
567
                    let start = start.as_str();
568
                    let end = end.as_str();
569
                    let max_length = start.len().min(end.len());
570
                    let length = start
571
                        .chars()
572
                        .zip(end.chars())
573
20
                        .position(|(a, b)| a != b)
574
                        .unwrap_or(max_length);
575
                    format!("{}{}*", self.key_prefix, &start[..length])
576
                }
577
                Bound::Unbounded => format!("{}*", self.key_prefix),
578
            },
579
            Bound::Unbounded => format!("{}*", self.key_prefix),
580
        };
581
        let mut client = self.get_client().await?;
582
        trace!(%pattern, count=self.scan_count, "Running SCAN");
583
        let opts = ScanOptions::default()
584
            .with_pattern(pattern)
585
            .with_count(self.scan_count);
586
        let mut scan_stream: AsyncIter<Value> = client
587
            .connection_manager
588
            .scan_options(opts)
589
            .await
590
            .err_tip(|| "During scan_options")?;
591
        let mut iterations = 0;
592
        let mut errors = vec![];
593
        while let Some(key) = scan_stream.next_item().await {
594
            if let Ok(Value::BulkString(raw_key)) = key {
595
                let Ok(str_key) = str::from_utf8(&raw_key) else {
596
                    error!(?raw_key, "Non-utf8 key");
597
                    errors.push(format!("Non-utf8 key {raw_key:?}"));
598
                    continue;
599
                };
600
                if let Some(key) = str_key.strip_prefix(&self.key_prefix) {
601
                    let key = StoreKey::new_str(key);
602
                    if range.contains(&key) {
603
                        iterations += 1;
604
                        if !handler(&key) {
605
                            error!("Issue in handler");
606
                            errors.push("Issue in handler".to_string());
607
                        }
608
                    } else {
609
                        trace!(%key, ?range, "Key not in range");
610
                    }
611
                } else {
612
                    errors.push("Key doesn't match prefix".to_string());
613
                }
614
            } else {
615
                error!(?key, "Non-string in key");
616
                errors.push("Non-string in key".to_string());
617
            }
618
        }
619
        if errors.is_empty() {
620
            Ok(iterations)
621
        } else {
622
            error!(?errors, "Errors in scan stream");
623
            Err(Error::new(Code::Internal, format!("Errors: {errors:?}")))
624
        }
625
8
    }
626
627
    async fn update(
628
        self: Pin<&Self>,
629
        key: StoreKey<'_>,
630
        mut reader: DropCloserReadHalf,
631
        _upload_size: UploadSizeInfo,
632
8
    ) -> Result<(), Error> {
633
        let final_key = self.encode_key(&key);
634
635
        // While the name generation function can be supplied by the user, we need to have the curly
636
        // braces in place in order to manage redis' hashing behavior and make sure that the temporary
637
        // key name and the final key name are directed to the same cluster node. See
638
        // https://redis.io/blog/redis-clustering-best-practices-with-keys/
639
        //
640
        // The TL;DR is that if we're in cluster mode and the names hash differently, we can't use request
641
        // pipelining. By using these braces, we tell redis to only hash the part of the temporary key that's
642
        // identical to the final key -- so they will always hash to the same node.
643
        let temp_key = format!(
644
            "temp-{}-{{{}}}",
645
            (self.temp_name_generator_fn)(),
646
            &final_key
647
        );
648
649
        if is_zero_digest(key.borrow()) {
650
            let chunk = reader
651
                .peek()
652
                .await
653
                .err_tip(|| "Failed to peek in RedisStore::update")?;
654
            if chunk.is_empty() {
655
                reader
656
                    .drain()
657
                    .await
658
                    .err_tip(|| "Failed to drain in RedisStore::update")?;
659
                // Zero-digest keys are special -- we don't need to do anything with it.
660
                return Ok(());
661
            }
662
        }
663
664
        let mut client = self.get_client().await?;
665
666
        let mut read_stream = reader
667
7
            .scan(0u32, |bytes_read, chunk_res| {
668
7
                future::ready(Some(
669
7
                    chunk_res
670
7
                        .err_tip(|| "Failed to read chunk in update in redis store")
671
7
                        .and_then(|chunk| 
{6
672
6
                            let offset = isize::try_from(*bytes_read).err_tip(|| "Could not convert offset to isize in RedisStore::update")
?0
;
673
6
                            let chunk_len = u32::try_from(chunk.len()).err_tip(
674
                                || "Could not convert chunk length to u32 in RedisStore::update",
675
0
                            )?;
676
6
                            let new_bytes_read = bytes_read
677
6
                                .checked_add(chunk_len)
678
6
                                .err_tip(|| "Overflow protection in RedisStore::update")
?0
;
679
6
                            *bytes_read = new_bytes_read;
680
6
                            Ok::<_, Error>((offset, *bytes_read, chunk))
681
6
                        }),
682
                ))
683
7
            }).zip(
684
            stream::repeat(client.connection_manager.clone()))
685
7
            .map(|(res, mut connection_manager)| {
686
7
                let (
offset6
,
end_pos6
,
chunk6
) = res
?1
;
687
6
                let temp_key_ref = &temp_key;
688
6
                Ok(async move {
689
6
                    connection_manager
690
6
                        .setrange::<_, _, usize>(temp_key_ref, offset, chunk.to_vec())
691
6
                        .await
692
6
                        .err_tip(
693
1
                            || format!("While appending to temp key ({temp_key_ref}) in RedisStore::update. offset = {offset}. end_pos = {end_pos}"),
694
1
                        )?;
695
5
                    Ok::<u32, Error>(end_pos)
696
6
                })
697
7
            })
698
            .try_buffer_unordered(self.max_chunk_uploads_per_update);
699
700
        let mut total_len: u32 = 0;
701
        while let Some(last_pos) = read_stream.try_next().await? {
702
            if last_pos > total_len {
703
                total_len = last_pos;
704
            }
705
        }
706
707
        let blob_len: usize = client
708
            .connection_manager
709
            .strlen(&temp_key)
710
            .await
711
0
            .err_tip(|| format!("In RedisStore::update strlen check for {temp_key}"))?;
712
        // This is a safety check to ensure that in the event some kind of retry was to happen
713
        // and the data was appended to the key twice, we reject the data.
714
        if blob_len != usize::try_from(total_len).unwrap_or(usize::MAX) {
715
            return Err(make_input_err!(
716
                "Data length mismatch in RedisStore::update for {}({}) - expected {} bytes, got {} bytes",
717
                key.borrow().as_str(),
718
                temp_key,
719
                total_len,
720
                blob_len,
721
            ));
722
        }
723
724
        // Rename the temp key so that the data appears under the real key. Any data already present in the real key is lost.
725
        client
726
            .connection_manager
727
            .rename::<_, _, ()>(&temp_key, final_key.as_ref())
728
            .await
729
            .err_tip(|| "While queueing key rename in RedisStore::update()")?;
730
731
        // If we have a publish channel configured, send a notice that the key has been set.
732
        if let Some(pub_sub_channel) = &self.pub_sub_channel {
733
            return Ok(client
734
                .connection_manager
735
                .publish(pub_sub_channel, final_key.as_ref())
736
                .await?);
737
        }
738
739
        Ok(())
740
8
    }
741
742
    async fn get_part(
743
        self: Pin<&Self>,
744
        key: StoreKey<'_>,
745
        writer: &mut DropCloserWriteHalf,
746
        offset: u64,
747
        length: Option<u64>,
748
5
    ) -> Result<(), Error> {
749
        let offset = isize::try_from(offset).err_tip(|| "Could not convert offset to isize")?;
750
        let length = length
751
4
            .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize"))
752
            .transpose()?;
753
754
        // To follow RBE spec we need to consider any digest's with
755
        // zero size to be existing.
756
        if is_zero_digest(key.borrow()) {
757
            return writer
758
                .send_eof()
759
                .err_tip(|| "Failed to send zero EOF in redis store get_part");
760
        }
761
762
        let encoded_key = self.encode_key(&key);
763
        let encoded_key = encoded_key.as_ref();
764
765
        // N.B. the `-1`'s you see here are because redis GETRANGE is inclusive at both the start and end, so when we
766
        // do math with indices we change them to be exclusive at the end.
767
768
        // We want to read the data at the key from `offset` to `offset + length`.
769
        let data_start = offset;
770
        let data_end = data_start
771
            .saturating_add(length.unwrap_or(isize::MAX as usize) as isize)
772
            .saturating_sub(1);
773
774
        // And we don't ever want to read more than `read_chunk_size` bytes at a time, so we'll need to iterate.
775
        let mut chunk_start = data_start;
776
        let mut chunk_end = cmp::min(
777
            data_start.saturating_add(self.read_chunk_size as isize) - 1,
778
            data_end,
779
        );
780
781
        let mut client = self.get_client().await?;
782
        loop {
783
            let chunk: Bytes = client
784
                .connection_manager
785
                .getrange(encoded_key, chunk_start, chunk_end)
786
                .await
787
                .err_tip(|| "In RedisStore::get_part::getrange")?;
788
789
            let didnt_receive_full_chunk = chunk.len() < self.read_chunk_size;
790
            let reached_end_of_data = chunk_end == data_end;
791
792
            if didnt_receive_full_chunk || reached_end_of_data {
793
                if !chunk.is_empty() {
794
                    writer
795
                        .send(chunk)
796
                        .await
797
                        .err_tip(|| "Failed to write data in RedisStore::get_part")?;
798
                }
799
800
                break; // No more data to read.
801
            }
802
803
            // We received a full chunk's worth of data, so write it...
804
            writer
805
                .send(chunk)
806
                .await
807
                .err_tip(|| "Failed to write data in RedisStore::get_part")?;
808
809
            // ...and go grab the next chunk.
810
            chunk_start = chunk_end + 1;
811
            chunk_end = cmp::min(
812
                chunk_start.saturating_add(self.read_chunk_size as isize) - 1,
813
                data_end,
814
            );
815
        }
816
817
        // If we didn't write any data, check if the key exists, if not return a NotFound error.
818
        // This is required by spec.
819
        if writer.get_bytes_written() == 0 {
820
            // We're supposed to read 0 bytes, so just check if the key exists.
821
            let exists: bool = client
822
                .connection_manager
823
                .exists(encoded_key)
824
                .await
825
                .err_tip(|| "In RedisStore::get_part::zero_exists")?;
826
827
            if !exists {
828
                return Err(make_err!(
829
                    Code::NotFound,
830
                    "Data not found in Redis store for digest: {key:?}"
831
                ));
832
            }
833
        }
834
835
        writer
836
            .send_eof()
837
            .err_tip(|| "Failed to write EOF in redis store get_part")
838
5
    }
839
840
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
841
0
        self
842
0
    }
843
844
0
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send) {
845
0
        self
846
0
    }
847
848
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send> {
849
0
        self
850
0
    }
851
852
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
853
0
        registry.register_indicator(self);
854
0
    }
855
856
0
    fn register_remove_callback(
857
0
        self: Arc<Self>,
858
0
        _callback: Arc<dyn RemoveItemCallback>,
859
0
    ) -> Result<(), Error> {
860
        // As redis doesn't drop stuff, we can just ignore this
861
0
        Ok(())
862
0
    }
863
}
864
865
#[async_trait]
866
impl<C: ConnectionLike + Clone + 'static + Send + Sync + Unpin, P: RedisPatternSubscriber + Unpin>
867
    HealthStatusIndicator for RedisStore<C, P>
868
{
869
1
    fn get_name(&self) -> &'static str {
870
1
        "RedisStore"
871
1
    }
872
873
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
874
        StoreDriver::check_health(Pin::new(self), namespace).await
875
0
    }
876
}
877
878
// -------------------------------------------------------------------
879
// Below this line are specific to the redis scheduler implementation.
880
// -------------------------------------------------------------------
881
882
/// The time in milliseconds that a redis cursor can be idle before it is closed.
883
const CURSOR_IDLE_MS: u64 = 30_000;
884
/// The name of the field in the Redis hash that stores the data.
885
const DATA_FIELD_NAME: &str = "data";
886
/// The name of the field in the Redis hash that stores the version.
887
const VERSION_FIELD_NAME: &str = "version";
888
/// The time to live of indexes in seconds. After this time redis may delete the index.
889
const INDEX_TTL_S: u64 = 60 * 60 * 24; // 24 hours.
890
891
#[allow(rustdoc::broken_intra_doc_links)]
892
/// Lua script to set a key if the version matches.
893
/// Args:
894
///   KEYS[1]: The key where the version is stored.
895
///   ARGV[1]: The expected version.
896
///   ARGV[2]: The new data.
897
///   ARGV[3*]: Key-value pairs of additional data to include.
898
/// Returns:
899
///   The new version if the version matches. nil is returned if the
900
///   value was not set.
901
pub const LUA_VERSION_SET_SCRIPT: &str = formatcp!(
902
    r"
903
local key = KEYS[1]
904
local expected_version = tonumber(ARGV[1])
905
local new_data = ARGV[2]
906
local new_version = redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', 1)
907
local i
908
local indexes = {{}}
909
910
if new_version-1 ~= expected_version then
911
    redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', -1)
912
    return {{ 0, new_version-1 }}
913
end
914
-- Skip first 2 argvs, as they are known inputs.
915
-- Remember: Lua is 1-indexed.
916
for i=3, #ARGV do
917
    indexes[i-2] = ARGV[i]
918
end
919
920
-- In testing we witnessed redis sometimes not update our FT indexes
921
-- resulting in stale data. It appears if we delete our keys then insert
922
-- them again it works and reduces risk significantly.
923
redis.call('DEL', key)
924
redis.call('HSET', key, '{DATA_FIELD_NAME}', new_data, '{VERSION_FIELD_NAME}', new_version, unpack(indexes))
925
926
return {{ 1, new_version }}
927
"
928
);
929
930
/// This is the output of the calculations below hardcoded into the executable.
931
const FINGERPRINT_CREATE_INDEX_HEX: &str = "3e762c15";
932
933
#[cfg(test)]
934
mod test {
935
    use super::FINGERPRINT_CREATE_INDEX_HEX;
936
937
    /// String of the `FT.CREATE` command used to create the index template.
938
    const CREATE_INDEX_TEMPLATE: &str = "FT.CREATE {} ON HASH PREFIX 1 {} NOOFFSETS NOHL NOFIELDS NOFREQS SCHEMA {} TAG CASESENSITIVE SORTABLE";
939
940
    /// Compile-time fingerprint of the `FT.CREATE` command used to create the
941
    /// index template. This is a simple CRC32 checksum of the command string.
942
    /// We don't care about it actually being a valid CRC32 checksum, just that
943
    /// it's a unique identifier with a low chance of collision.
944
1
    const fn fingerprint_create_index_template() -> u32 {
945
        const POLY: u32 = 0xEDB8_8320;
946
        const DATA: &[u8] = CREATE_INDEX_TEMPLATE.as_bytes();
947
1
        let mut crc = 0xFFFF_FFFF;
948
1
        let mut i = 0;
949
102
        while i < DATA.len() {
  Branch (949:15): [True: 101, False: 1]
950
101
            let byte = DATA[i];
951
101
            crc ^= byte as u32;
952
953
101
            let mut j = 0;
954
909
            while j < 8 {
  Branch (954:19): [True: 808, False: 101]
955
808
                crc = if crc & 1 != 0 {
  Branch (955:26): [True: 386, False: 422]
956
386
                    (crc >> 1) ^ POLY
957
                } else {
958
422
                    crc >> 1
959
                };
960
808
                j += 1;
961
            }
962
101
            i += 1;
963
        }
964
1
        crc
965
1
    }
966
967
    /// Verify that our calculation always evaluates to this fixed value.
968
    #[test]
969
1
    fn test_fingerprint_value() {
970
1
        assert_eq!(
971
1
            format!("{:08x}", &fingerprint_create_index_template()),
972
            FINGERPRINT_CREATE_INDEX_HEX,
973
        );
974
1
    }
975
}
976
977
/// Get the name of the index to create for the given field.
978
/// This will add some prefix data to the name to try and ensure
979
/// if the index definition changes, the name will get a new name.
980
macro_rules! get_index_name {
981
    ($prefix:expr, $field:expr, $maybe_sort:expr) => {
982
        format_args!(
983
            "{}_{}_{}_{}",
984
            $prefix,
985
            $field,
986
            $maybe_sort.unwrap_or(""),
987
            FINGERPRINT_CREATE_INDEX_HEX
988
        )
989
    };
990
}
991
992
/// Try to sanitize a string to be used as a Redis key.
993
/// We don't actually modify the string, just check if it's valid.
994
25
const fn try_sanitize(s: &str) -> Option<&str> {
995
    // Note: We cannot use for loops or iterators here because they are not const.
996
    // Allowing us to use a const function here gives the compiler the ability to
997
    // optimize this function away entirely in the case where the input is constant.
998
25
    let chars = s.as_bytes();
999
25
    let mut i: usize = 0;
1000
25
    let len = s.len();
1001
    loop {
1002
424
        if i >= len {
  Branch (1002:12): [True: 25, False: 399]
  Branch (1002:12): [Folded - Ignored]
1003
25
            break;
1004
399
        }
1005
399
        let c = chars[i];
1006
399
        if !c.is_ascii_alphanumeric() && 
c != b'_'15
{
  Branch (1006:12): [True: 15, False: 384]
  Branch (1006:42): [True: 0, False: 15]
  Branch (1006:12): [Folded - Ignored]
  Branch (1006:42): [Folded - Ignored]
1007
0
            return None;
1008
399
        }
1009
399
        i += 1;
1010
    }
1011
25
    Some(s)
1012
25
}
1013
1014
/// An individual subscription to a key in Redis.
1015
#[derive(Debug)]
1016
pub struct RedisSubscription {
1017
    receiver: Option<tokio::sync::watch::Receiver<String>>,
1018
    weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
1019
}
1020
1021
impl SchedulerSubscription for RedisSubscription {
1022
    /// Wait for the subscription key to change.
1023
18
    async fn changed(&mut self) -> Result<(), Error> {
1024
18
        let receiver = self
1025
18
            .receiver
1026
18
            .as_mut()
1027
18
            .ok_or_else(|| make_err!(
Code::Internal0
, "In RedisSubscription::changed::as_mut"))
?0
;
1028
18
        receiver
1029
18
            .changed()
1030
18
            .await
1031
15
            .map_err(|_| make_err!(
Code::Internal0
, "In RedisSubscription::changed::changed"))
1032
15
    }
1033
}
1034
1035
// If the subscription is dropped, we need to possibly remove the key from the
1036
// subscribed keys map.
1037
impl Drop for RedisSubscription {
1038
4
    fn drop(&mut self) {
1039
4
        let Some(receiver) = self.receiver.take() else {
  Branch (1039:13): [True: 4, False: 0]
  Branch (1039:13): [Folded - Ignored]
1040
0
            warn!("RedisSubscription has already been dropped, nothing to do.");
1041
0
            return; // Already dropped, nothing to do.
1042
        };
1043
4
        let key = receiver.borrow().clone();
1044
        // IMPORTANT: This must be dropped before receiver_count() is called.
1045
4
        drop(receiver);
1046
4
        let Some(
subscribed_keys1
) = self.weak_subscribed_keys.upgrade() else {
  Branch (1046:13): [True: 1, False: 3]
  Branch (1046:13): [Folded - Ignored]
1047
3
            return; // Already dropped, nothing to do.
1048
        };
1049
1
        let mut subscribed_keys = subscribed_keys.write();
1050
1
        let Some(value) = subscribed_keys.get(&key) else {
  Branch (1050:13): [True: 1, False: 0]
  Branch (1050:13): [Folded - Ignored]
1051
0
            error!(
1052
0
                "Key {key} was not found in subscribed keys when checking if it should be removed."
1053
            );
1054
0
            return;
1055
        };
1056
        // If we have no receivers, cleanup the entry from our map.
1057
1
        if value.receiver_count() == 0 {
  Branch (1057:12): [True: 1, False: 0]
  Branch (1057:12): [Folded - Ignored]
1058
1
            subscribed_keys.remove(key);
1059
1
        
}0
1060
4
    }
1061
}
1062
1063
/// A publisher for a key in Redis.
1064
#[derive(Debug)]
1065
struct RedisSubscriptionPublisher {
1066
    sender: Mutex<tokio::sync::watch::Sender<String>>,
1067
}
1068
1069
impl RedisSubscriptionPublisher {
1070
4
    fn new(
1071
4
        key: String,
1072
4
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
1073
4
    ) -> (Self, RedisSubscription) {
1074
4
        let (sender, receiver) = tokio::sync::watch::channel(key);
1075
4
        let publisher = Self {
1076
4
            sender: Mutex::new(sender),
1077
4
        };
1078
4
        let subscription = RedisSubscription {
1079
4
            receiver: Some(receiver),
1080
4
            weak_subscribed_keys,
1081
4
        };
1082
4
        (publisher, subscription)
1083
4
    }
1084
1085
0
    fn subscribe(
1086
0
        &self,
1087
0
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
1088
0
    ) -> RedisSubscription {
1089
0
        let receiver = self.sender.lock().subscribe();
1090
0
        RedisSubscription {
1091
0
            receiver: Some(receiver),
1092
0
            weak_subscribed_keys,
1093
0
        }
1094
0
    }
1095
1096
1
    fn receiver_count(&self) -> usize {
1097
1
        self.sender.lock().receiver_count()
1098
1
    }
1099
1100
18
    fn notify(&self) {
1101
        // TODO(https://github.com/sile/patricia_tree/issues/40) When this is addressed
1102
        // we can remove the `Mutex` and use the mutable iterator directly.
1103
18
        self.sender.lock().send_modify(|_| {});
1104
18
    }
1105
}
1106
1107
#[derive(Debug, Clone)]
1108
pub struct RedisSubscriptionManager {
1109
    subscribed_keys: Arc<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
1110
    tx_for_test: tokio::sync::mpsc::UnboundedSender<String>,
1111
    _subscription_spawn: Arc<Mutex<JoinHandleDropGuard<()>>>,
1112
}
1113
1114
/// Trait for subscribing to Redis pub/sub channels with pattern matching.
1115
pub trait RedisPatternSubscriber: Send + 'static {
1116
    /// Subscribe to channels matching the given pattern.
1117
    #[allow(clippy::manual_async_fn)]
1118
    fn subscribe_to_pattern(
1119
        &mut self,
1120
        channel_pattern: &str,
1121
    ) -> impl Future<Output = RedisResult<Pin<Box<dyn Stream<Item = Msg> + '_ + Send>>>> + Send;
1122
}
1123
1124
impl RedisPatternSubscriber for PubSub {
1125
    #[allow(clippy::manual_async_fn)]
1126
3
    fn subscribe_to_pattern(
1127
3
        &mut self,
1128
3
        channel_pattern: &str,
1129
3
    ) -> impl Future<Output = RedisResult<Pin<Box<dyn Stream<Item = Msg> + '_ + Send>>>> + Send
1130
    {
1131
3
        async move {
1132
3
            self.psubscribe(channel_pattern).await
?0
;
1133
3
            Ok(self.on_message().boxed())
1134
3
        }
1135
3
    }
1136
}
1137
1138
impl RedisPatternSubscriber for MockPubSub {
1139
    #[allow(clippy::manual_async_fn)]
1140
0
    fn subscribe_to_pattern(
1141
0
        &mut self,
1142
0
        _channel_pattern: &str,
1143
0
    ) -> impl Future<Output = RedisResult<Pin<Box<dyn Stream<Item = Msg> + '_ + Send>>>> + Send
1144
    {
1145
1
        async move { Ok(stream::pending().boxed()) }
1146
0
    }
1147
}
1148
1149
impl RedisSubscriptionManager {
1150
4
    pub fn new<P>(
1151
4
        mut pub_sub: P,
1152
4
        subscriber_channel: Option<UnboundedReceiver<PushInfo>>,
1153
4
        pub_sub_channel: String,
1154
4
    ) -> Self
1155
4
    where
1156
4
        P: RedisPatternSubscriber,
1157
    {
1158
4
        let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new()));
1159
4
        let subscribed_keys_weak = Arc::downgrade(&subscribed_keys);
1160
4
        let (tx_for_test, mut rx_for_test) = unbounded_channel();
1161
4
        let mut local_subscriber_channel: Pin<Box<dyn Stream<Item = PushInfo> + Send>> =
1162
4
            subscriber_channel.map_or_else(
1163
4
                || stream::pending::<PushInfo>().boxed(),
1164
0
                |channel| UnboundedReceiverStream::new(channel).boxed(),
1165
            );
1166
        Self {
1167
4
            subscribed_keys,
1168
4
            tx_for_test,
1169
4
            _subscription_spawn: Arc::new(Mutex::new(spawn!(
1170
                "redis_subscribe_spawn",
1171
4
                async move {
1172
4
                    let mut stream = match pub_sub.subscribe_to_pattern(&pub_sub_channel).await {
1173
0
                        Err(e) => {
1174
0
                            error!(?e, "Failed to subscribe to Redis pattern");
1175
0
                            return;
1176
                        }
1177
4
                        Ok(s) => s,
1178
                    };
1179
                    loop {
1180
                        loop {
1181
18
                            let 
key14
= select! {
1182
18
                                
value14
= rx_for_test.recv() => {
1183
14
                                    let Some(value) = value else {
  Branch (1183:41): [True: 0, False: 0]
  Branch (1183:41): [Folded - Ignored]
  Branch (1183:41): [Folded - Ignored]
  Branch (1183:41): [True: 14, False: 0]
  Branch (1183:41): [True: 0, False: 0]
1184
0
                                        unreachable!("Channel should never close");
1185
                                    };
1186
14
                                    value
1187
                                },
1188
18
                                
msg0
= stream.next() => {
1189
0
                                    if let Some(msg) = msg {
  Branch (1189:44): [True: 0, False: 0]
  Branch (1189:44): [Folded - Ignored]
  Branch (1189:44): [Folded - Ignored]
  Branch (1189:44): [True: 0, False: 0]
  Branch (1189:44): [True: 0, False: 0]
1190
0
                                        match msg.get_payload().expect("Valid payload") {
1191
0
                                            Value::SimpleString(s) => {
1192
0
                                                s.clone()
1193
                                            }
1194
0
                                            Value::BulkString(v) => {
1195
0
                                                String::from_utf8(v).expect("String message")
1196
                                            }
1197
                                            _ => {
1198
0
                                                error!(?msg, "Received non-string message in RedisSubscriptionManager");
1199
0
                                                continue;
1200
                                            }
1201
                                        }
1202
                                    } else {
1203
                                        // Check to see if our parent has been dropped and if so kill spawn.
1204
0
                                        if subscribed_keys_weak.upgrade().is_none() {
  Branch (1204:44): [True: 0, False: 0]
  Branch (1204:44): [Folded - Ignored]
  Branch (1204:44): [Folded - Ignored]
  Branch (1204:44): [True: 0, False: 0]
  Branch (1204:44): [True: 0, False: 0]
1205
0
                                            warn!("It appears our parent has been dropped, exiting RedisSubscriptionManager spawn");
1206
0
                                            return;
1207
0
                                        }
1208
0
                                        error!("Error receiving message in RedisSubscriptionManager reconnecting and flagging everything changed");
1209
0
                                        break;
1210
                                    }
1211
                                },
1212
18
                                
maybe_push_info0
= local_subscriber_channel.next() => {
1213
0
                                    if let Some(push_info) = maybe_push_info {
  Branch (1213:44): [True: 0, False: 0]
  Branch (1213:44): [Folded - Ignored]
  Branch (1213:44): [Folded - Ignored]
  Branch (1213:44): [True: 0, False: 0]
  Branch (1213:44): [True: 0, False: 0]
1214
0
                                        if push_info.data.len() != 1 {
  Branch (1214:44): [True: 0, False: 0]
  Branch (1214:44): [Folded - Ignored]
  Branch (1214:44): [Folded - Ignored]
  Branch (1214:44): [True: 0, False: 0]
  Branch (1214:44): [True: 0, False: 0]
1215
0
                                            error!(?push_info, "Expected exactly one message on subscriber_channel");
1216
0
                                            continue;
1217
0
                                        }
1218
0
                                        match push_info.data.first().unwrap() {
1219
0
                                            Value::SimpleString(s) => {
1220
0
                                                s.clone()
1221
                                            }
1222
0
                                            Value::BulkString(v) => {
1223
0
                                                String::from_utf8(v.clone()).expect("String message")
1224
                                            }
1225
0
                                            other => {
1226
0
                                                error!(?other, "Received non-string message in RedisSubscriptionManager");
1227
0
                                                continue;
1228
                                            }
1229
                                        }
1230
                                    } else {
1231
0
                                        error!("Error receiving message in RedisSubscriptionManager from subscriber_channel");
1232
0
                                        break;
1233
                                    }
1234
                                }
1235
                            };
1236
14
                            let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
  Branch (1236:33): [True: 0, False: 0]
  Branch (1236:33): [Folded - Ignored]
  Branch (1236:33): [Folded - Ignored]
  Branch (1236:33): [True: 14, False: 0]
  Branch (1236:33): [True: 0, False: 0]
1237
0
                                warn!(
1238
0
                                    "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"
1239
                                );
1240
0
                                return;
1241
                            };
1242
14
                            let subscribed_keys_mux = subscribed_keys.read();
1243
14
                            subscribed_keys_mux
1244
14
                                .common_prefix_values(&*key)
1245
14
                                .for_each(RedisSubscriptionPublisher::notify);
1246
                        }
1247
                        // Sleep for a small amount of time to ensure we don't reconnect too quickly.
1248
0
                        sleep(Duration::from_secs(1)).await;
1249
                        // If we reconnect or lag behind we might have had dirty keys, so we need to
1250
                        // flag all of them as changed.
1251
0
                        let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
  Branch (1251:29): [True: 0, False: 0]
  Branch (1251:29): [Folded - Ignored]
  Branch (1251:29): [Folded - Ignored]
  Branch (1251:29): [True: 0, False: 0]
  Branch (1251:29): [True: 0, False: 0]
1252
0
                            warn!(
1253
0
                                "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"
1254
                            );
1255
0
                            return;
1256
                        };
1257
0
                        let subscribed_keys_mux = subscribed_keys.read();
1258
                        // Just in case also get a new receiver.
1259
0
                        for publisher in subscribed_keys_mux.values() {
1260
0
                            publisher.notify();
1261
0
                        }
1262
                    }
1263
0
                }
1264
            ))),
1265
        }
1266
4
    }
1267
}
1268
1269
impl SubscriptionManagerNotify for RedisSubscriptionManager {
1270
14
    fn notify_for_test(&self, value: String) {
1271
14
        self.tx_for_test.send(value).unwrap();
1272
14
    }
1273
}
1274
1275
impl SchedulerSubscriptionManager for RedisSubscriptionManager {
1276
    type Subscription = RedisSubscription;
1277
1278
4
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
1279
4
    where
1280
4
        K: SchedulerStoreKeyProvider,
1281
    {
1282
4
        let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys);
1283
4
        let mut subscribed_keys = self.subscribed_keys.write();
1284
4
        let key = key.get_key();
1285
4
        let key_str = key.as_str();
1286
4
        let mut subscription = if let Some(
publisher0
) = subscribed_keys.get(&key_str) {
  Branch (1286:39): [True: 0, False: 0]
  Branch (1286:39): [Folded - Ignored]
  Branch (1286:39): [Folded - Ignored]
  Branch (1286:39): [True: 0, False: 4]
1287
0
            publisher.subscribe(weak_subscribed_keys)
1288
        } else {
1289
4
            let (publisher, subscription) =
1290
4
                RedisSubscriptionPublisher::new(key_str.to_string(), weak_subscribed_keys);
1291
4
            subscribed_keys.insert(key_str, publisher);
1292
4
            subscription
1293
        };
1294
4
        subscription
1295
4
            .receiver
1296
4
            .as_mut()
1297
4
            .ok_or_else(|| 
{0
1298
0
                make_err!(
1299
0
                    Code::Internal,
1300
                    "Receiver should be set in RedisSubscriptionManager::subscribe"
1301
                )
1302
0
            })?
1303
4
            .mark_changed();
1304
1305
4
        Ok(subscription)
1306
4
    }
1307
1308
1
    fn is_reliable() -> bool {
1309
1
        false
1310
1
    }
1311
}
1312
1313
impl<C: Clone + ConnectionLike + Sync + Send + 'static, P: RedisPatternSubscriber> SchedulerStore
1314
    for RedisStore<C, P>
1315
{
1316
    type SubscriptionManager = RedisSubscriptionManager;
1317
1318
6
    fn subscription_manager(&self) -> Result<Arc<RedisSubscriptionManager>, Error> {
1319
6
        let mut subscription_manager = self.subscription_manager.lock();
1320
1321
6
        if let Some(
subscription_manager3
) = &*subscription_manager {
  Branch (1321:16): [True: 0, False: 0]
  Branch (1321:16): [Folded - Ignored]
  Branch (1321:16): [Folded - Ignored]
  Branch (1321:16): [True: 3, False: 3]
1322
3
            Ok(subscription_manager.clone())
1323
        } else {
1324
3
            let Some(pub_sub_channel) = &self.pub_sub_channel else {
  Branch (1324:17): [True: 0, False: 0]
  Branch (1324:17): [Folded - Ignored]
  Branch (1324:17): [Folded - Ignored]
  Branch (1324:17): [True: 3, False: 0]
1325
0
                return Err(make_input_err!(
1326
0
                    "RedisStore must have a pubsub channel for a Redis Scheduler if using subscriptions"
1327
0
                ));
1328
            };
1329
3
            let mut lock_pub_sub = self.pub_sub.lock();
1330
3
            let Some(pub_sub) = lock_pub_sub.take() else {
  Branch (1330:17): [True: 0, False: 0]
  Branch (1330:17): [Folded - Ignored]
  Branch (1330:17): [Folded - Ignored]
  Branch (1330:17): [True: 3, False: 0]
1331
0
                return Err(make_input_err!(
1332
0
                    "RedisStore must have a pubsub for Redis Scheduler if using subscriptions"
1333
0
                ));
1334
            };
1335
3
            let sub = Arc::new(RedisSubscriptionManager::new(
1336
3
                pub_sub,
1337
3
                self.subscriber_channel.lock().take(),
1338
3
                pub_sub_channel.clone(),
1339
            ));
1340
3
            *subscription_manager = Some(sub.clone());
1341
3
            Ok(sub)
1342
        }
1343
6
    }
1344
1345
19
    async fn update_data<T>(&self, data: T) -> Result<Option<i64>, Error>
1346
19
    where
1347
19
        T: SchedulerStoreDataProvider
1348
19
            + SchedulerStoreKeyProvider
1349
19
            + SchedulerCurrentVersionProvider
1350
19
            + Send,
1351
19
    {
1352
19
        let key = data.get_key();
1353
19
        let redis_key = self.encode_key(&key);
1354
19
        let mut client = self.get_client().await
?0
;
1355
19
        let maybe_index = data.get_indexes().err_tip(|| 
{0
1356
0
            format!("Err getting index in RedisStore::update_data::versioned for {redis_key}")
1357
0
        })?;
1358
19
        if <T as SchedulerStoreKeyProvider>::Versioned::VALUE {
  Branch (1358:12): [True: 0, False: 0]
  Branch (1358:12): [True: 0, False: 0]
  Branch (1358:12): [True: 0, False: 0]
  Branch (1358:12): [Folded - Ignored]
  Branch (1358:12): [Folded - Ignored]
  Branch (1358:12): [True: 0, False: 3]
  Branch (1358:12): [True: 0, False: 0]
  Branch (1358:12): [True: 16, False: 0]
1359
16
            let current_version = data.current_version();
1360
16
            let data = data.try_into_bytes().err_tip(|| 
{0
1361
0
                format!("Could not convert value to bytes in RedisStore::update_data::versioned for {redis_key}")
1362
0
            })?;
1363
16
            let mut script = self
1364
16
                .update_if_version_matches_script
1365
16
                .key(redis_key.as_ref());
1366
16
            let mut script_invocation = script.arg(format!("{current_version}")).arg(data.to_vec());
1367
64
            for (
name48
,
value48
) in maybe_index {
1368
48
                script_invocation = script_invocation.arg(name).arg(value.to_vec());
1369
48
            }
1370
16
            let start = Instant::now();
1371
16
            let (success, new_version): (bool, i64) = script_invocation
1372
16
                .invoke_async(&mut client.connection_manager)
1373
16
                .await
1374
16
                .err_tip(|| format!(
"In RedisStore::update_data::versioned for {key:?}"0
))
?0
;
1375
1376
16
            let elapsed = start.elapsed();
1377
1378
16
            if elapsed > Duration::from_millis(100) {
  Branch (1378:16): [True: 0, False: 0]
  Branch (1378:16): [True: 0, False: 0]
  Branch (1378:16): [True: 0, False: 0]
  Branch (1378:16): [Folded - Ignored]
  Branch (1378:16): [Folded - Ignored]
  Branch (1378:16): [True: 0, False: 0]
  Branch (1378:16): [True: 0, False: 0]
  Branch (1378:16): [True: 0, False: 16]
1379
0
                warn!(
1380
                    %redis_key,
1381
                    ?elapsed,
1382
0
                    "Slow Redis version-set operation"
1383
                );
1384
16
            }
1385
16
            if !success {
  Branch (1385:16): [True: 0, False: 0]
  Branch (1385:16): [True: 0, False: 0]
  Branch (1385:16): [True: 0, False: 0]
  Branch (1385:16): [Folded - Ignored]
  Branch (1385:16): [Folded - Ignored]
  Branch (1385:16): [True: 0, False: 0]
  Branch (1385:16): [True: 0, False: 0]
  Branch (1385:16): [True: 4, False: 12]
1386
4
                warn!(
1387
                    %redis_key,
1388
                    %key,
1389
                    %current_version,
1390
                    %new_version,
1391
4
                    caller = core::any::type_name::<T>(),
1392
4
                    "Redis version conflict - optimistic lock failed"
1393
                );
1394
4
                return Ok(None);
1395
12
            }
1396
12
            trace!(
1397
                %redis_key,
1398
                %key,
1399
                old_version = %current_version,
1400
                %new_version,
1401
12
                "Updated redis key to new version"
1402
            );
1403
            // If we have a publish channel configured, send a notice that the key has been set.
1404
12
            if let Some(pub_sub_channel) = &self.pub_sub_channel {
  Branch (1404:20): [True: 0, False: 0]
  Branch (1404:20): [True: 0, False: 0]
  Branch (1404:20): [True: 0, False: 0]
  Branch (1404:20): [Folded - Ignored]
  Branch (1404:20): [Folded - Ignored]
  Branch (1404:20): [True: 0, False: 0]
  Branch (1404:20): [True: 0, False: 0]
  Branch (1404:20): [True: 12, False: 0]
1405
12
                return Ok(client
1406
12
                    .connection_manager
1407
12
                    .publish(pub_sub_channel, redis_key.as_ref())
1408
12
                    .await
?0
);
1409
0
            }
1410
0
            Ok(Some(new_version))
1411
        } else {
1412
3
            let data = data.try_into_bytes().err_tip(|| 
{0
1413
0
                format!("Could not convert value to bytes in RedisStore::update_data::noversion for {redis_key}")
1414
0
            })?;
1415
3
            let mut fields: Vec<(String, _)> = vec![];
1416
3
            fields.push((DATA_FIELD_NAME.into(), data.to_vec()));
1417
3
            for (
name0
,
value0
) in maybe_index {
1418
0
                fields.push((name.into(), value.to_vec()));
1419
0
            }
1420
3
            client
1421
3
                .connection_manager
1422
3
                .hset_multiple::<_, _, _, ()>(redis_key.as_ref(), &fields)
1423
3
                .await
1424
3
                .err_tip(|| format!(
"In RedisStore::update_data::noversion for {redis_key}"0
))
?0
;
1425
            // If we have a publish channel configured, send a notice that the key has been set.
1426
3
            if let Some(pub_sub_channel) = &self.pub_sub_channel {
  Branch (1426:20): [True: 0, False: 0]
  Branch (1426:20): [True: 0, False: 0]
  Branch (1426:20): [True: 0, False: 0]
  Branch (1426:20): [Folded - Ignored]
  Branch (1426:20): [Folded - Ignored]
  Branch (1426:20): [True: 3, False: 0]
  Branch (1426:20): [True: 0, False: 0]
  Branch (1426:20): [True: 0, False: 0]
1427
3
                return Ok(client
1428
3
                    .connection_manager
1429
3
                    .publish(pub_sub_channel, redis_key.as_ref())
1430
3
                    .await
?0
);
1431
0
            }
1432
0
            Ok(Some(0)) // Always use "0" version since this is not a versioned request.
1433
        }
1434
19
    }
1435
1436
24
    async fn search_by_index_prefix<K>(
1437
24
        &self,
1438
24
        index: K,
1439
24
    ) -> Result<
1440
24
        impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
1441
24
        Error,
1442
24
    >
1443
24
    where
1444
24
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,
1445
24
    {
1446
24
        let index_value = index.index_value();
1447
25
        let 
run_ft_aggregate24
= || {
1448
25
            let connection_manager = self.connection_manager.clone();
1449
25
            let sanitized_field = try_sanitize(index_value.as_ref()).err_tip(|| 
{0
1450
0
                format!("In RedisStore::search_by_index_prefix::try_sanitize - {index_value:?}")
1451
0
            })?;
1452
25
            Ok::<_, Error>(async move {
1453
25
                ft_aggregate(
1454
25
                    connection_manager,
1455
25
                    format!(
1456
25
                        "{}",
1457
25
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1458
                    ),
1459
25
                    if sanitized_field.is_empty() {
  Branch (1459:24): [True: 0, False: 0]
  Branch (1459:24): [True: 0, False: 0]
  Branch (1459:24): [Folded - Ignored]
  Branch (1459:24): [Folded - Ignored]
  Branch (1459:24): [True: 0, False: 17]
  Branch (1459:24): [True: 0, False: 3]
  Branch (1459:24): [True: 2, False: 3]
1460
2
                        "*".to_string()
1461
                    } else {
1462
23
                        format!("@{}:{{ {} }}", K::INDEX_NAME, sanitized_field)
1463
                    },
1464
                    FtAggregateOptions {
1465
25
                        load: vec![DATA_FIELD_NAME.into(), VERSION_FIELD_NAME.into()],
1466
25
                        cursor: FtAggregateCursor {
1467
25
                            count: self.max_count_per_cursor,
1468
25
                            max_idle: CURSOR_IDLE_MS,
1469
25
                        },
1470
25
                        sort_by: K::MAYBE_SORT_KEY.map_or_else(Vec::new, |v| vec![
format!22
(
"@{v}"22
)]),
1471
                    },
1472
                )
1473
25
                .await
1474
25
            })
1475
25
        };
1476
1477
24
        let 
stream23
= run_ft_aggregate()
?0
1478
24
            .or_else(|_| async move 
{1
1479
1
                let mut schema = vec![SearchSchema {
1480
1
                    field_name: K::INDEX_NAME.into(),
1481
1
                    sortable: false,
1482
1
                }];
1483
1
                if let Some(sort_key) = K::MAYBE_SORT_KEY {
  Branch (1483:24): [True: 0, False: 0]
  Branch (1483:24): [True: 0, False: 0]
  Branch (1483:24): [Folded - Ignored]
  Branch (1483:24): [Folded - Ignored]
  Branch (1483:24): [True: 0, False: 0]
  Branch (1483:24): [True: 0, False: 0]
  Branch (1483:24): [True: 1, False: 0]
1484
1
                    schema.push(SearchSchema {
1485
1
                        field_name: sort_key.into(),
1486
1
                        sortable: true,
1487
1
                    });
1488
1
                
}0
1489
1490
1
                let create_result = ft_create(
1491
1
                    self.connection_manager.clone(),
1492
1
                    format!(
1493
1
                        "{}",
1494
1
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1495
1
                    ),
1496
1
                    FtCreateOptions {
1497
1
                        prefixes: vec![K::KEY_PREFIX.into()],
1498
1
                        nohl: true,
1499
1
                        nofields: true,
1500
1
                        nofreqs: true,
1501
1
                        nooffsets: true,
1502
1
                        temporary: Some(INDEX_TTL_S),
1503
1
                    },
1504
1
                    schema,
1505
1
                )
1506
1
                .await
1507
1
                .err_tip(|| {
1508
1
                    format!(
1509
1
                        "Error with ft_create in RedisStore::search_by_index_prefix({})",
1510
1
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1511
                    )
1512
1
                });
1513
1
                let run_result = run_ft_aggregate()
?0
.await.err_tip(|| {
1514
1
                    format!(
1515
1
                        "Error with second ft_aggregate in RedisStore::search_by_index_prefix({})",
1516
1
                        get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),
1517
                    )
1518
1
                });
1519
                // Creating the index will race which is ok. If it fails to create, we only
1520
                // error if the second ft_aggregate call fails and fails to create.
1521
1
                run_result.or_else(move |e| create_result.merge(Err(e)))
1522
2
            })
1523
24
            .await
?1
;
1524
23
        Ok(stream.filter_map(|result| async move 
{13
1525
13
            let raw_redis_map = match result {
1526
13
                Ok(v) => v,
1527
0
                Err(e) => {
1528
                    return Some(
1529
0
                        Err(Error::from(e))
1530
0
                            .err_tip(|| "Error in stream of in RedisStore::search_by_index_prefix"),
1531
                    );
1532
                }
1533
            };
1534
1535
13
            let Some(redis_map) = raw_redis_map.as_sequence() else {
  Branch (1535:17): [True: 0, False: 0]
  Branch (1535:17): [True: 0, False: 0]
  Branch (1535:17): [Folded - Ignored]
  Branch (1535:17): [Folded - Ignored]
  Branch (1535:17): [True: 8, False: 0]
  Branch (1535:17): [True: 2, False: 0]
  Branch (1535:17): [True: 3, False: 0]
1536
0
                return Some(Err(Error::new(
1537
0
                    Code::Internal,
1538
0
                    format!("Non-array from ft_aggregate: {raw_redis_map:?}"),
1539
0
                )));
1540
            };
1541
13
            let mut redis_map_iter = redis_map.iter();
1542
13
            let mut bytes_data: Option<Bytes> = None;
1543
13
            let mut version: Option<i64> = None;
1544
            loop {
1545
40
                let Some(
key27
) = redis_map_iter.next() else {
  Branch (1545:21): [True: 0, False: 0]
  Branch (1545:21): [True: 0, False: 0]
  Branch (1545:21): [Folded - Ignored]
  Branch (1545:21): [Folded - Ignored]
  Branch (1545:21): [True: 16, False: 8]
  Branch (1545:21): [True: 4, False: 2]
  Branch (1545:21): [True: 7, False: 3]
1546
13
                    break;
1547
                };
1548
27
                let value = redis_map_iter.next().unwrap();
1549
27
                let Value::BulkString(k) = key else {
  Branch (1549:21): [True: 0, False: 0]
  Branch (1549:21): [True: 0, False: 0]
  Branch (1549:21): [Folded - Ignored]
  Branch (1549:21): [Folded - Ignored]
  Branch (1549:21): [True: 16, False: 0]
  Branch (1549:21): [True: 4, False: 0]
  Branch (1549:21): [True: 7, False: 0]
1550
0
                    return Some(Err(Error::new(
1551
0
                        Code::Internal,
1552
0
                        format!("Non-BulkString key from ft_aggregate: {key:?}"),
1553
0
                    )));
1554
                };
1555
27
                let Ok(str_key) = str::from_utf8(k) else {
  Branch (1555:21): [True: 0, False: 0]
  Branch (1555:21): [True: 0, False: 0]
  Branch (1555:21): [Folded - Ignored]
  Branch (1555:21): [Folded - Ignored]
  Branch (1555:21): [True: 16, False: 0]
  Branch (1555:21): [True: 4, False: 0]
  Branch (1555:21): [True: 7, False: 0]
1556
0
                    return Some(Err(Error::new(
1557
0
                        Code::Internal,
1558
0
                        format!("Non-utf8 key from ft_aggregate: {key:?}"),
1559
0
                    )));
1560
                };
1561
27
                let Value::BulkString(v) = value else {
  Branch (1561:21): [True: 0, False: 0]
  Branch (1561:21): [True: 0, False: 0]
  Branch (1561:21): [Folded - Ignored]
  Branch (1561:21): [Folded - Ignored]
  Branch (1561:21): [True: 16, False: 0]
  Branch (1561:21): [True: 4, False: 0]
  Branch (1561:21): [True: 7, False: 0]
1562
0
                    return Some(Err(Error::new(
1563
0
                        Code::Internal,
1564
0
                        format!("Non-BulkString value from ft_aggregate: {key:?}"),
1565
0
                    )));
1566
                };
1567
27
                match str_key {
1568
27
                    DATA_FIELD_NAME => {
1569
13
                        bytes_data = Some(v.clone().into());
1570
13
                    }
1571
14
                    VERSION_FIELD_NAME => {
1572
13
                        let Ok(str_v) = str::from_utf8(v) else {
  Branch (1572:29): [True: 0, False: 0]
  Branch (1572:29): [True: 0, False: 0]
  Branch (1572:29): [Folded - Ignored]
  Branch (1572:29): [Folded - Ignored]
  Branch (1572:29): [True: 8, False: 0]
  Branch (1572:29): [True: 2, False: 0]
  Branch (1572:29): [True: 3, False: 0]
1573
0
                            return Some(Err(Error::new(
1574
0
                                Code::Internal,
1575
0
                                format!("Non-utf8 version value from ft_aggregate: {v:?}"),
1576
0
                            )));
1577
                        };
1578
13
                        let Ok(raw_version) = str_v.parse::<i64>() else {
  Branch (1578:29): [True: 0, False: 0]
  Branch (1578:29): [True: 0, False: 0]
  Branch (1578:29): [Folded - Ignored]
  Branch (1578:29): [Folded - Ignored]
  Branch (1578:29): [True: 8, False: 0]
  Branch (1578:29): [True: 2, False: 0]
  Branch (1578:29): [True: 3, False: 0]
1579
0
                            return Some(Err(Error::new(
1580
0
                                Code::Internal,
1581
0
                                format!("Non-integer version value from ft_aggregate: {str_v:?}"),
1582
0
                            )));
1583
                        };
1584
13
                        version = Some(raw_version);
1585
                    }
1586
1
                    other => {
1587
1
                        if K::MAYBE_SORT_KEY == Some(other) {
  Branch (1587:28): [True: 0, False: 0]
  Branch (1587:28): [True: 0, False: 0]
  Branch (1587:28): [Folded - Ignored]
  Branch (1587:28): [Folded - Ignored]
  Branch (1587:28): [True: 0, False: 0]
  Branch (1587:28): [True: 0, False: 0]
  Branch (1587:28): [True: 1, False: 0]
1588
1
                            // ignore sort keys
1589
1
                        } else {
1590
0
                            return Some(Err(Error::new(
1591
0
                                Code::Internal,
1592
0
                                format!("Extra keys from ft_aggregate: {other}"),
1593
0
                            )));
1594
                        }
1595
                    }
1596
                }
1597
            }
1598
13
            let Some(found_bytes_data) = bytes_data else {
  Branch (1598:17): [True: 0, False: 0]
  Branch (1598:17): [True: 0, False: 0]
  Branch (1598:17): [Folded - Ignored]
  Branch (1598:17): [Folded - Ignored]
  Branch (1598:17): [True: 8, False: 0]
  Branch (1598:17): [True: 2, False: 0]
  Branch (1598:17): [True: 3, False: 0]
1599
0
                return Some(Err(Error::new(
1600
0
                    Code::Internal,
1601
0
                    format!("Missing '{DATA_FIELD_NAME}' in ft_aggregate, got: {raw_redis_map:?}"),
1602
0
                )));
1603
            };
1604
            Some(
1605
13
                K::decode(version.unwrap_or(0), found_bytes_data)
1606
13
                    .err_tip(|| "In RedisStore::search_by_index_prefix::decode"),
1607
            )
1608
26
        }))
1609
24
    }
1610
1611
94
    async fn get_and_decode<K>(
1612
94
        &self,
1613
94
        key: K,
1614
94
    ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>
1615
94
    where
1616
94
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send,
1617
94
    {
1618
94
        let key = key.get_key();
1619
94
        let key = self.encode_key(&key);
1620
94
        let mut client = self.get_client().await
?0
;
1621
94
        let results: Vec<Value> = client
1622
94
            .connection_manager
1623
94
            .hmget::<_, Vec<String>, Vec<Value>>(
1624
94
                key.as_ref(),
1625
94
                vec![VERSION_FIELD_NAME.into(), DATA_FIELD_NAME.into()],
1626
94
            )
1627
94
            .await
1628
94
            .err_tip(|| format!(
"In RedisStore::get_without_version::notversioned {key}"0
))
?0
;
1629
94
        let Some(Value::BulkString(
data48
)) = results.get(1) else {
  Branch (1629:13): [True: 0, False: 0]
  Branch (1629:13): [True: 0, False: 0]
  Branch (1629:13): [True: 0, False: 0]
  Branch (1629:13): [Folded - Ignored]
  Branch (1629:13): [Folded - Ignored]
  Branch (1629:13): [True: 2, False: 0]
  Branch (1629:13): [True: 0, False: 45]
  Branch (1629:13): [True: 46, False: 1]
1630
46
            return Ok(None);
1631
        };
1632
        #[allow(clippy::get_first)]
1633
48
        let version = if let Some(raw_v) = results.get(0) {
  Branch (1633:30): [True: 0, False: 0]
  Branch (1633:30): [True: 0, False: 0]
  Branch (1633:30): [True: 0, False: 0]
  Branch (1633:30): [Folded - Ignored]
  Branch (1633:30): [Folded - Ignored]
  Branch (1633:30): [True: 2, False: 0]
  Branch (1633:30): [True: 0, False: 0]
  Branch (1633:30): [True: 46, False: 0]
1634
48
            match raw_v {
1635
0
                Value::Int(v) => *v,
1636
46
                Value::BulkString(v) => i64::from_str(str::from_utf8(v).expect("utf-8 bulkstring"))
1637
46
                    .expect("integer bulkstring"),
1638
2
                Value::Nil => 0,
1639
                _ => {
1640
0
                    warn!(?raw_v, "Non-integer version!");
1641
0
                    0
1642
                }
1643
            }
1644
        } else {
1645
0
            0
1646
        };
1647
        Ok(Some(
1648
48
            K::decode(version, Bytes::from(data.clone())).err_tip(|| 
{0
1649
0
                format!("In RedisStore::get_with_version::notversioned::decode {key}")
1650
0
            })?,
1651
        ))
1652
94
    }
1653
}