/build/source/nativelink-store/src/redis_store.rs
Line  | Count  | Source  | 
1  |  | // Copyright 2024 The NativeLink Authors. All rights reserved.  | 
2  |  | //  | 
3  |  | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");  | 
4  |  | // you may not use this file except in compliance with the License.  | 
5  |  | // You may obtain a copy of the License at  | 
6  |  | //  | 
7  |  | //    See LICENSE file for details  | 
8  |  | //  | 
9  |  | // Unless required by applicable law or agreed to in writing, software  | 
10  |  | // distributed under the License is distributed on an "AS IS" BASIS,  | 
11  |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
12  |  | // See the License for the specific language governing permissions and  | 
13  |  | // limitations under the License.  | 
14  |  |  | 
15  |  | use core::cmp;  | 
16  |  | use core::ops::{Bound, RangeBounds}; | 
17  |  | use core::pin::Pin;  | 
18  |  | use core::time::Duration;  | 
19  |  | use std::borrow::Cow;  | 
20  |  | use std::sync::{Arc, Weak}; | 
21  |  |  | 
22  |  | use async_trait::async_trait;  | 
23  |  | use bytes::Bytes;  | 
24  |  | use const_format::formatcp;  | 
25  |  | use fred::clients::{Pool as RedisPool, SubscriberClient}; | 
26  |  | use fred::interfaces::{ClientLike, KeysInterface, PubsubInterface}; | 
27  |  | use fred::prelude::{Client, EventInterface, HashesInterface, RediSearchInterface}; | 
28  |  | use fred::types::config::{ | 
29  |  |     Config as RedisConfig, ConnectionConfig, PerformanceConfig, ReconnectPolicy, UnresponsiveConfig,  | 
30  |  | };  | 
31  |  | use fred::types::redisearch::{ | 
32  |  |     AggregateOperation, FtAggregateOptions, FtCreateOptions, IndexKind, Load, SearchField,  | 
33  |  |     SearchSchema, SearchSchemaKind, WithCursor,  | 
34  |  | };  | 
35  |  | use fred::types::scan::Scanner;  | 
36  |  | use fred::types::scripts::Script;  | 
37  |  | use fred::types::{Builder, Key as RedisKey, Map as RedisMap, SortOrder, Value as RedisValue}; | 
38  |  | use futures::stream::FuturesUnordered;  | 
39  |  | use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future}; | 
40  |  | use nativelink_config::stores::{RedisMode, RedisSpec}; | 
41  |  | use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err}; | 
42  |  | use nativelink_metric::MetricsComponent;  | 
43  |  | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; | 
44  |  | use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; | 
45  |  | use nativelink_util::spawn;  | 
46  |  | use nativelink_util::store_trait::{ | 
47  |  |     BoolValue, RemoveItemCallback, SchedulerCurrentVersionProvider, SchedulerIndexProvider,  | 
48  |  |     SchedulerStore, SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,  | 
49  |  |     SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo,  | 
50  |  | };  | 
51  |  | use nativelink_util::task::JoinHandleDropGuard;  | 
52  |  | use parking_lot::{Mutex, RwLock}; | 
53  |  | use patricia_tree::StringPatriciaMap;  | 
54  |  | use tokio::select;  | 
55  |  | use tokio::time::sleep;  | 
56  |  | use tracing::{error, info, warn}; | 
57  |  | use uuid::Uuid;  | 
58  |  |  | 
59  |  | use crate::cas_utils::is_zero_digest;  | 
60  |  | use crate::redis_utils::ft_aggregate;  | 
61  |  |  | 
62  |  | /// The default size of the read chunk when reading data from Redis.  | 
63  |  | /// Note: If this changes it should be updated in the config documentation.  | 
64  |  | const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024;  | 
65  |  |  | 
66  |  | /// The default size of the connection pool if not specified.  | 
67  |  | /// Note: If this changes it should be updated in the config documentation.  | 
68  |  | const DEFAULT_CONNECTION_POOL_SIZE: usize = 3;  | 
69  |  |  | 
70  |  | /// The default delay between retries if not specified.  | 
71  |  | /// Note: If this changes it should be updated in the config documentation.  | 
72  |  | const DEFAULT_RETRY_DELAY: f32 = 0.1;  | 
73  |  | /// The amount of jitter to add to the retry delay if not specified.  | 
74  |  | /// Note: If this changes it should be updated in the config documentation.  | 
75  |  | const DEFAULT_RETRY_JITTER: f32 = 0.5;  | 
76  |  |  | 
77  |  | /// The default maximum capacity of the broadcast channel if not specified.  | 
78  |  | /// Note: If this changes it should be updated in the config documentation.  | 
79  |  | const DEFAULT_BROADCAST_CHANNEL_CAPACITY: usize = 4096;  | 
80  |  |  | 
81  |  | /// The default connection timeout in milliseconds if not specified.  | 
82  |  | /// Note: If this changes it should be updated in the config documentation.  | 
83  |  | const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000;  | 
84  |  |  | 
85  |  | /// The default command timeout in milliseconds if not specified.  | 
86  |  | /// Note: If this changes it should be updated in the config documentation.  | 
87  |  | const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000;  | 
88  |  |  | 
89  |  | /// The default maximum number of chunk uploads per update.  | 
90  |  | /// Note: If this changes it should be updated in the config documentation.  | 
91  |  | const DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE: usize = 10;  | 
92  |  |  | 
93  |  | /// The default COUNT value passed when scanning keys in Redis.  | 
94  |  | /// Note: If this changes it should be updated in the config documentation.  | 
95  |  | const DEFAULT_SCAN_COUNT: u32 = 10_000;  | 
96  |  |  | 
97  |  | /// A [`StoreDriver`] implementation that uses Redis as a backing store.  | 
98  |  | #[derive(Debug, MetricsComponent)]  | 
99  |  | pub struct RedisStore { | 
100  |  |     /// The client pool connecting to the backing Redis instance(s).  | 
101  |  |     client_pool: RedisPool,  | 
102  |  |  | 
103  |  |     /// A channel to publish updates to when a key is added, removed, or modified.  | 
104  |  |     #[metric(  | 
105  |  |         help = "The pubsub channel to publish updates to when a key is added, removed, or modified"  | 
106  |  |     )]  | 
107  |  |     pub_sub_channel: Option<String>,  | 
108  |  |  | 
109  |  |     /// A redis client for managing subscriptions.  | 
110  |  |     /// TODO: This should be moved into the store in followups once a standard use pattern has been determined.  | 
111  |  |     subscriber_client: SubscriberClient,  | 
112  |  |  | 
113  |  |     /// A function used to generate names for temporary keys.  | 
114  |  |     temp_name_generator_fn: fn() -> String,  | 
115  |  |  | 
116  |  |     /// A common prefix to append to all keys before they are sent to Redis.  | 
117  |  |     ///  | 
118  |  |     /// See [`RedisStore::key_prefix`](`nativelink_config::stores::RedisStore::key_prefix`).  | 
119  |  |     #[metric(help = "Prefix to append to all keys before sending to Redis")]  | 
120  |  |     key_prefix: String,  | 
121  |  |  | 
122  |  |     /// The amount of data to read from Redis at a time.  | 
123  |  |     #[metric(help = "The amount of data to read from Redis at a time")]  | 
124  |  |     read_chunk_size: usize,  | 
125  |  |  | 
126  |  |     /// The maximum number of chunk uploads per update.  | 
127  |  |     /// This is used to limit the number of chunk uploads per update to prevent  | 
128  |  |     #[metric(help = "The maximum number of chunk uploads per update")]  | 
129  |  |     max_chunk_uploads_per_update: usize,  | 
130  |  |  | 
131  |  |     /// The COUNT value passed when scanning keys in Redis.  | 
132  |  |     /// This is used to hint the amount of work that should be done per response.  | 
133  |  |     #[metric(help = "The COUNT value passed when scanning keys in Redis")]  | 
134  |  |     scan_count: u32,  | 
135  |  |  | 
136  |  |     /// Redis script used to update a value in redis if the version matches.  | 
137  |  |     /// This is done by incrementing the version number and then setting the new data  | 
138  |  |     /// only if the version number matches the existing version number.  | 
139  |  |     update_if_version_matches_script: Script,  | 
140  |  |  | 
141  |  |     /// A manager for subscriptions to keys in Redis.  | 
142  |  |     subscription_manager: Mutex<Option<Arc<RedisSubscriptionManager>>>,  | 
143  |  | }  | 
144  |  |  | 
145  |  | impl RedisStore { | 
146  |  |     /// Create a new `RedisStore` from the given configuration.  | 
147  | 2  |     pub fn new(mut spec: RedisSpec) -> Result<Arc<Self>, Error> { | 
148  | 2  |         if spec.addresses.is_empty() {  Branch (148:12): [True: 0, False: 2]
   Branch (148:12): [Folded - Ignored]
  | 
149  | 0  |             return Err(make_err!(  | 
150  | 0  |                 Code::InvalidArgument,  | 
151  | 0  |                 "No addresses were specified in redis store configuration."  | 
152  | 0  |             ));  | 
153  | 2  |         }  | 
154  | 2  |         let [addr] = spec.addresses.as_slice() else {  Branch (154:13): [True: 2, False: 0]
   Branch (154:13): [Folded - Ignored]
  | 
155  | 0  |             return Err(make_err!(  | 
156  | 0  |                 Code::Unimplemented,  | 
157  | 0  |                 "Connecting directly to multiple redis nodes in a cluster is currently unsupported. Please specify a single URL to a single node, and nativelink will use cluster discover to find the other nodes."  | 
158  | 0  |             ));  | 
159  |  |         };  | 
160  | 2  |         let redis_config = match spec.mode { | 
161  | 0  |             RedisMode::Cluster => RedisConfig::from_url_clustered(addr),  | 
162  | 0  |             RedisMode::Sentinel => RedisConfig::from_url_sentinel(addr),  | 
163  | 2  |             RedisMode::Standard => RedisConfig::from_url_centralized(addr),  | 
164  |  |         }  | 
165  | 2  |         .err_tip_with_code(|e| {0   | 
166  | 0  |             (  | 
167  | 0  |                 Code::InvalidArgument,  | 
168  | 0  |                 format!("while parsing redis node address: {e}"), | 
169  | 0  |             )  | 
170  | 0  |         })?;  | 
171  |  |  | 
172  | 2  |         let reconnect_policy = { | 
173  | 2  |             if spec.retry.delay == 0.0 {  Branch (173:16): [True: 2, False: 0]
   Branch (173:16): [Folded - Ignored]
  | 
174  | 2  |                 spec.retry.delay = DEFAULT_RETRY_DELAY;  | 
175  | 2  |             }0   | 
176  | 2  |             if spec.retry.jitter == 0.0 {  Branch (176:16): [True: 2, False: 0]
   Branch (176:16): [Folded - Ignored]
  | 
177  | 2  |                 spec.retry.jitter = DEFAULT_RETRY_JITTER;  | 
178  | 2  |             }0   | 
179  |  |  | 
180  | 4  |             let to_ms2  = |secs: f32| -> u32 {  | 
181  | 4  |                 Duration::from_secs_f32(secs)  | 
182  | 4  |                     .as_millis()  | 
183  | 4  |                     .try_into()  | 
184  | 4  |                     .unwrap_or(u32::MAX)  | 
185  | 4  |             };  | 
186  |  |  | 
187  | 2  |             let max_retries = u32::try_from(spec.retry.max_retries)  | 
188  | 2  |                 .err_tip(|| "max_retries could not be converted to u32 in RedisStore::new")?0 ;  | 
189  |  |  | 
190  | 2  |             let min_delay_ms = to_ms(spec.retry.delay);  | 
191  | 2  |             let max_delay_ms = 8000;  | 
192  | 2  |             let jitter = to_ms(spec.retry.jitter * spec.retry.delay);  | 
193  |  |  | 
194  | 2  |             let mut reconnect_policy =  | 
195  | 2  |                 ReconnectPolicy::new_exponential(max_retries, min_delay_ms, max_delay_ms, 2);  | 
196  | 2  |             reconnect_policy.set_jitter(jitter);  | 
197  | 2  |             reconnect_policy  | 
198  |  |         };  | 
199  |  |  | 
200  |  |         { | 
201  | 2  |             if spec.broadcast_channel_capacity == 0 {  Branch (201:16): [True: 2, False: 0]
   Branch (201:16): [Folded - Ignored]
  | 
202  | 2  |                 spec.broadcast_channel_capacity = DEFAULT_BROADCAST_CHANNEL_CAPACITY;  | 
203  | 2  |             }0   | 
204  | 2  |             if spec.connection_timeout_ms == 0 {  Branch (204:16): [True: 2, False: 0]
   Branch (204:16): [Folded - Ignored]
  | 
205  | 2  |                 spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS;  | 
206  | 2  |             }0   | 
207  | 2  |             if spec.command_timeout_ms == 0 {  Branch (207:16): [True: 2, False: 0]
   Branch (207:16): [Folded - Ignored]
  | 
208  | 2  |                 spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS;  | 
209  | 2  |             }0   | 
210  | 2  |             if spec.connection_pool_size == 0 {  Branch (210:16): [True: 2, False: 0]
   Branch (210:16): [Folded - Ignored]
  | 
211  | 2  |                 spec.connection_pool_size = DEFAULT_CONNECTION_POOL_SIZE;  | 
212  | 2  |             }0   | 
213  | 2  |             if spec.read_chunk_size == 0 {  Branch (213:16): [True: 2, False: 0]
   Branch (213:16): [Folded - Ignored]
  | 
214  | 2  |                 spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;  | 
215  | 2  |             }0   | 
216  | 2  |             if spec.max_chunk_uploads_per_update == 0 {  Branch (216:16): [True: 2, False: 0]
   Branch (216:16): [Folded - Ignored]
  | 
217  | 2  |                 spec.max_chunk_uploads_per_update = DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE;  | 
218  | 2  |             }0   | 
219  | 2  |             if spec.scan_count == 0 {  Branch (219:16): [True: 2, False: 0]
   Branch (219:16): [Folded - Ignored]
  | 
220  | 2  |                 spec.scan_count = DEFAULT_SCAN_COUNT;  | 
221  | 2  |             }0   | 
222  |  |         }  | 
223  | 2  |         let connection_timeout = Duration::from_millis(spec.connection_timeout_ms);  | 
224  | 2  |         let command_timeout = Duration::from_millis(spec.command_timeout_ms);  | 
225  |  |  | 
226  | 2  |         let mut builder = Builder::from_config(redis_config);  | 
227  | 2  |         builder  | 
228  | 2  |             .set_performance_config(PerformanceConfig { | 
229  | 2  |                 default_command_timeout: command_timeout,  | 
230  | 2  |                 broadcast_channel_capacity: spec.broadcast_channel_capacity,  | 
231  | 2  |                 ..Default::default()  | 
232  | 2  |             })  | 
233  | 2  |             .set_connection_config(ConnectionConfig { | 
234  | 2  |                 connection_timeout,  | 
235  | 2  |                 internal_command_timeout: command_timeout,  | 
236  | 2  |                 unresponsive: UnresponsiveConfig { | 
237  | 2  |                     max_timeout: Some(connection_timeout),  | 
238  | 2  |                     // This number needs to be less than the connection timeout.  | 
239  | 2  |                     // We use 4 as it is a good balance between not spamming the server  | 
240  | 2  |                     // and not waiting too long.  | 
241  | 2  |                     interval: connection_timeout / 4,  | 
242  | 2  |                 },  | 
243  | 2  |                 ..Default::default()  | 
244  | 2  |             })  | 
245  | 2  |             .set_policy(reconnect_policy);  | 
246  |  |  | 
247  | 2  |         let client_pool = builder  | 
248  | 2  |             .build_pool(spec.connection_pool_size)  | 
249  | 2  |             .err_tip(|| "while creating redis connection pool")?0 ;  | 
250  |  |  | 
251  | 2  |         let subscriber_client = builder  | 
252  | 2  |             .build_subscriber_client()  | 
253  | 2  |             .err_tip(|| "while creating redis subscriber client")?0 ;  | 
254  |  |  | 
255  | 2  |         Self::new_from_builder_and_parts(  | 
256  | 2  |             client_pool,  | 
257  | 2  |             subscriber_client,  | 
258  | 2  |             spec.experimental_pub_sub_channel.clone(),  | 
259  | 1  |             || Uuid::new_v4().to_string(),  | 
260  | 2  |             spec.key_prefix.clone(),  | 
261  | 2  |             spec.read_chunk_size,  | 
262  | 2  |             spec.max_chunk_uploads_per_update,  | 
263  | 2  |             spec.scan_count,  | 
264  |  |         )  | 
265  | 2  |         .map(Arc::new)  | 
266  | 2  |     }  | 
267  |  |  | 
268  |  |     /// Used for testing when determinism is required.  | 
269  |  |     #[expect(clippy::too_many_arguments)]  | 
270  | 15  |     pub fn new_from_builder_and_parts(  | 
271  | 15  |         client_pool: RedisPool,  | 
272  | 15  |         subscriber_client: SubscriberClient,  | 
273  | 15  |         pub_sub_channel: Option<String>,  | 
274  | 15  |         temp_name_generator_fn: fn() -> String,  | 
275  | 15  |         key_prefix: String,  | 
276  | 15  |         read_chunk_size: usize,  | 
277  | 15  |         max_chunk_uploads_per_update: usize,  | 
278  | 15  |         scan_count: u32,  | 
279  | 15  |     ) -> Result<Self, Error> { | 
280  |  |         // Start connection pool (this will retry forever by default).  | 
281  | 15  |         client_pool.connect();  | 
282  | 15  |         subscriber_client.connect();  | 
283  |  |  | 
284  | 15  |         info!("Redis index fingerprint: {FINGERPRINT_CREATE_INDEX_HEX}"); | 
285  |  |  | 
286  | 15  |         Ok(Self { | 
287  | 15  |             client_pool,  | 
288  | 15  |             pub_sub_channel,  | 
289  | 15  |             subscriber_client,  | 
290  | 15  |             temp_name_generator_fn,  | 
291  | 15  |             key_prefix,  | 
292  | 15  |             read_chunk_size,  | 
293  | 15  |             max_chunk_uploads_per_update,  | 
294  | 15  |             scan_count,  | 
295  | 15  |             update_if_version_matches_script: Script::from_lua(LUA_VERSION_SET_SCRIPT),  | 
296  | 15  |             subscription_manager: Mutex::new(None),  | 
297  | 15  |         })  | 
298  | 15  |     }  | 
299  |  |  | 
300  | 83  |     async fn get_client(&'_ self) -> Result<&'_ Client, Error> { | 
301  | 83  |         let client = self.client_pool.next();  | 
302  | 83  |         let config = client.client_config();  | 
303  | 83  |         if config.mocks.is_none() {  Branch (303:12): [True: 0, False: 0]
   Branch (303:12): [True: 2, False: 24]
   Branch (303:12): [Folded - Ignored]
   Branch (303:12): [True: 0, False: 57]
  | 
304  | 2  |             client.wait_for_connect().await.err_tip(||  | 
305  |  |                 format!(  | 
306  | 2  |                     "Connection issue connecting to redis server with hosts: {:?}, username: {}, database: {}", | 
307  | 2  |                     config.server.hosts().iter().map(|s| format!("{}:{}", s.host, s.port)).collect::<Vec<String>>(), | 
308  | 2  |                     config.username.unwrap_or_else(|| "None".to_string()),  | 
309  | 2  |                     config.database.unwrap_or_default()  | 
310  |  |                 )  | 
311  | 2  |             )?;  | 
312  | 81  |         }  | 
313  | 81  |         Ok(client)  | 
314  | 83  |     }  | 
315  |  |  | 
316  |  |     /// Encode a [`StoreKey`] so it can be sent to Redis.  | 
317  | 74  |     fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> { | 
318  | 74  |         let key_body = key.as_str();  | 
319  | 74  |         if self.key_prefix.is_empty() {  Branch (319:12): [True: 70, False: 4]
   Branch (319:12): [Folded - Ignored]
  | 
320  | 70  |             key_body  | 
321  |  |         } else { | 
322  |  |             // This is in the hot path for all redis operations, so we try to reuse the allocation  | 
323  |  |             // from `key.as_str()` if possible.  | 
324  | 4  |             match key_body { | 
325  | 4  |                 Cow::Owned(mut encoded_key) => { | 
326  | 4  |                     encoded_key.insert_str(0, &self.key_prefix);  | 
327  | 4  |                     Cow::Owned(encoded_key)  | 
328  |  |                 }  | 
329  | 0  |                 Cow::Borrowed(body) => { | 
330  | 0  |                     let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len());  | 
331  | 0  |                     encoded_key.push_str(&self.key_prefix);  | 
332  | 0  |                     encoded_key.push_str(body);  | 
333  | 0  |                     Cow::Owned(encoded_key)  | 
334  |  |                 }  | 
335  |  |             }  | 
336  |  |         }  | 
337  | 74  |     }  | 
338  |  | }  | 
339  |  |  | 
340  |  | #[async_trait]  | 
341  |  | impl StoreDriver for RedisStore { | 
342  |  |     async fn has_with_results(  | 
343  |  |         self: Pin<&Self>,  | 
344  |  |         keys: &[StoreKey<'_>],  | 
345  |  |         results: &mut [Option<u64>],  | 
346  | 7  |     ) -> Result<(), Error> { | 
347  |  |         // TODO(palfrey) We could use pipeline here, but it makes retry more  | 
348  |  |         // difficult and it doesn't work very well in cluster mode.  | 
349  |  |         // If we wanted to optimize this with pipeline be careful to  | 
350  |  |         // implement retry and to support cluster mode.  | 
351  |  |         let client = self.get_client().await?;  | 
352  |  |         keys.iter()  | 
353  |  |             .zip(results.iter_mut())  | 
354  | 6  |             .map(|(key, result)| async move { | 
355  |  |                 // We need to do a special pass to ensure our zero key exist.  | 
356  | 6  |                 if is_zero_digest(key.borrow()) {  Branch (356:20): [True: 2, False: 4]
   Branch (356:20): [Folded - Ignored]
  | 
357  | 2  |                     *result = Some(0);  | 
358  | 2  |                     return Ok::<_, Error>(());  | 
359  | 4  |                 }  | 
360  | 4  |                 let encoded_key = self.encode_key(key);  | 
361  | 4  |                 let pipeline = client.pipeline();  | 
362  | 4  |                 pipeline  | 
363  | 4  |                     .strlen::<(), _>(encoded_key.as_ref())  | 
364  | 4  |                     .await  | 
365  | 4  |                     .err_tip(|| {0   | 
366  | 0  |                         format!("In RedisStore::has_with_results::strlen for {encoded_key}") | 
367  | 0  |                     })?;  | 
368  |  |                 // Redis returns 0 when the key doesn't exist  | 
369  |  |                 // AND when the key exists with value of length 0.  | 
370  |  |                 // Therefore, we need to check both length and existence  | 
371  |  |                 // and do it in a pipeline for efficiency.  | 
372  | 4  |                 pipeline  | 
373  | 4  |                     .exists::<(), _>(encoded_key.as_ref())  | 
374  | 4  |                     .await  | 
375  | 4  |                     .err_tip(|| {0   | 
376  | 0  |                         format!("In RedisStore::has_with_results::exists for {encoded_key}") | 
377  | 0  |                     })?;  | 
378  | 4  |                 let (blob_len, exists) = pipeline  | 
379  | 4  |                     .all::<(u64, bool)>()  | 
380  | 4  |                     .await  | 
381  | 4  |                     .err_tip(|| "In RedisStore::has_with_results::query")?0 ;  | 
382  |  |  | 
383  | 4  |                 *result = if exists { Some(blob_len) } else { None0  };  Branch (383:30): [True: 4, False: 0]
   Branch (383:30): [Folded - Ignored]
  | 
384  |  |  | 
385  | 4  |                 Ok::<_, Error>(())  | 
386  | 12  |             })  | 
387  |  |             .collect::<FuturesUnordered<_>>()  | 
388  |  |             .try_collect()  | 
389  |  |             .await  | 
390  | 7  |     }  | 
391  |  |  | 
392  |  |     async fn list(  | 
393  |  |         self: Pin<&Self>,  | 
394  |  |         range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),  | 
395  |  |         handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),  | 
396  | 8  |     ) -> Result<u64, Error> { | 
397  |  |         let range = (  | 
398  |  |             range.0.map(StoreKey::into_owned),  | 
399  |  |             range.1.map(StoreKey::into_owned),  | 
400  |  |         );  | 
401  |  |         let pattern = match range.0 { | 
402  |  |             Bound::Included(ref start) | Bound::Excluded(ref start) => match range.1 { | 
403  |  |                 Bound::Included(ref end) | Bound::Excluded(ref end) => { | 
404  |  |                     let start = start.as_str();  | 
405  |  |                     let end = end.as_str();  | 
406  |  |                     let max_length = start.len().min(end.len());  | 
407  |  |                     let length = start  | 
408  |  |                         .chars()  | 
409  |  |                         .zip(end.chars())  | 
410  | 20  |                         .position(|(a, b)| a != b)  | 
411  |  |                         .unwrap_or(max_length);  | 
412  |  |                     format!("{}{}*", self.key_prefix, &start[..length]) | 
413  |  |                 }  | 
414  |  |                 Bound::Unbounded => format!("{}*", self.key_prefix), | 
415  |  |             },  | 
416  |  |             Bound::Unbounded => format!("{}*", self.key_prefix), | 
417  |  |         };  | 
418  |  |         let client = self.get_client().await?;  | 
419  |  |         let mut scan_stream = client.scan(pattern, Some(self.scan_count), None);  | 
420  |  |         let mut iterations = 0;  | 
421  |  |         'outer: while let Some(mut page) = scan_stream.try_next().await? { | 
422  |  |             if let Some(keys) = page.take_results() { | 
423  |  |                 for key in keys { | 
424  |  |                     // TODO: Notification of conversion errors  | 
425  |  |                     // Any results that do not conform to expectations are ignored.  | 
426  |  |                     if let Some(key) = key.as_str() { | 
427  |  |                         if let Some(key) = key.strip_prefix(&self.key_prefix) { | 
428  |  |                             let key = StoreKey::new_str(key);  | 
429  |  |                             if range.contains(&key) { | 
430  |  |                                 iterations += 1;  | 
431  |  |                                 if !handler(&key) { | 
432  |  |                                     break 'outer;  | 
433  |  |                                 }  | 
434  |  |                             }  | 
435  |  |                         }  | 
436  |  |                     }  | 
437  |  |                 }  | 
438  |  |             }  | 
439  |  |             page.next();  | 
440  |  |         }  | 
441  |  |         Ok(iterations)  | 
442  | 8  |     }  | 
443  |  |  | 
444  |  |     async fn update(  | 
445  |  |         self: Pin<&Self>,  | 
446  |  |         key: StoreKey<'_>,  | 
447  |  |         mut reader: DropCloserReadHalf,  | 
448  |  |         _upload_size: UploadSizeInfo,  | 
449  | 8  |     ) -> Result<(), Error> { | 
450  |  |         let final_key = self.encode_key(&key);  | 
451  |  |  | 
452  |  |         // While the name generation function can be supplied by the user, we need to have the curly  | 
453  |  |         // braces in place in order to manage redis' hashing behavior and make sure that the temporary  | 
454  |  |         // key name and the final key name are directed to the same cluster node. See  | 
455  |  |         // https://redis.io/blog/redis-clustering-best-practices-with-keys/  | 
456  |  |         //  | 
457  |  |         // The TL;DR is that if we're in cluster mode and the names hash differently, we can't use request  | 
458  |  |         // pipelining. By using these braces, we tell redis to only hash the part of the temporary key that's  | 
459  |  |         // identical to the final key -- so they will always hash to the same node.  | 
460  |  |         let temp_key = format!(  | 
461  |  |             "temp-{}-{{{}}}", | 
462  |  |             (self.temp_name_generator_fn)(),  | 
463  |  |             &final_key  | 
464  |  |         );  | 
465  |  |  | 
466  |  |         if is_zero_digest(key.borrow()) { | 
467  |  |             let chunk = reader  | 
468  |  |                 .peek()  | 
469  |  |                 .await  | 
470  |  |                 .err_tip(|| "Failed to peek in RedisStore::update")?;  | 
471  |  |             if chunk.is_empty() { | 
472  |  |                 reader  | 
473  |  |                     .drain()  | 
474  |  |                     .await  | 
475  |  |                     .err_tip(|| "Failed to drain in RedisStore::update")?;  | 
476  |  |                 // Zero-digest keys are special -- we don't need to do anything with it.  | 
477  |  |                 return Ok(());  | 
478  |  |             }  | 
479  |  |         }  | 
480  |  |  | 
481  |  |         let client = self.get_client().await?;  | 
482  |  |  | 
483  |  |         let mut read_stream = reader  | 
484  | 6  |             .scan(0u32, |bytes_read, chunk_res| { | 
485  | 6  |                 future::ready(Some(  | 
486  | 6  |                     chunk_res  | 
487  | 6  |                         .err_tip(|| "Failed to read chunk in update in redis store")  | 
488  | 6  |                         .and_then(|chunk| {5   | 
489  | 5  |                             let offset = *bytes_read;  | 
490  | 5  |                             let chunk_len = u32::try_from(chunk.len()).err_tip(  | 
491  |  |                                 || "Could not convert chunk length to u32 in RedisStore::update",  | 
492  | 0  |                             )?;  | 
493  | 5  |                             let new_bytes_read = bytes_read  | 
494  | 5  |                                 .checked_add(chunk_len)  | 
495  | 5  |                                 .err_tip(|| "Overflow protection in RedisStore::update")?0 ;  | 
496  | 5  |                             *bytes_read = new_bytes_read;  | 
497  | 5  |                             Ok::<_, Error>((offset, *bytes_read, chunk))  | 
498  | 5  |                         }),  | 
499  |  |                 ))  | 
500  | 6  |             })  | 
501  | 6  |             .map(|res| { | 
502  | 6  |                 let (offset5 , end_pos5 , chunk5 ) = res?1 ;  | 
503  | 5  |                 let temp_key_ref = &temp_key;  | 
504  | 5  |                 Ok(async move { | 
505  | 5  |                     client  | 
506  | 5  |                         .setrange::<(), _, _>(temp_key_ref, offset, chunk)  | 
507  | 5  |                         .await  | 
508  | 5  |                         .err_tip(  | 
509  | 0  |                             || format!("While appending to temp key ({temp_key_ref}) in RedisStore::update. offset = {offset}. end_pos = {end_pos}"), | 
510  | 0  |                         )?;  | 
511  | 5  |                     Ok::<u32, Error>(end_pos)  | 
512  | 5  |                 })  | 
513  | 6  |             })  | 
514  |  |             .try_buffer_unordered(self.max_chunk_uploads_per_update);  | 
515  |  |  | 
516  |  |         let mut total_len: u32 = 0;  | 
517  |  |         while let Some(last_pos) = read_stream.try_next().await? { | 
518  |  |             if last_pos > total_len { | 
519  |  |                 total_len = last_pos;  | 
520  |  |             }  | 
521  |  |         }  | 
522  |  |  | 
523  |  |         let blob_len = client  | 
524  |  |             .strlen::<u64, _>(&temp_key)  | 
525  |  |             .await  | 
526  | 0  |             .err_tip(|| format!("In RedisStore::update strlen check for {temp_key}"))?; | 
527  |  |         // This is a safety check to ensure that in the event some kind of retry was to happen  | 
528  |  |         // and the data was appended to the key twice, we reject the data.  | 
529  |  |         if blob_len != u64::from(total_len) { | 
530  |  |             return Err(make_input_err!(  | 
531  |  |                 "Data length mismatch in RedisStore::update for {}({}) - expected {} bytes, got {} bytes", | 
532  |  |                 key.borrow().as_str(),  | 
533  |  |                 temp_key,  | 
534  |  |                 total_len,  | 
535  |  |                 blob_len,  | 
536  |  |             ));  | 
537  |  |         }  | 
538  |  |  | 
539  |  |         // Rename the temp key so that the data appears under the real key. Any data already present in the real key is lost.  | 
540  |  |         client  | 
541  |  |             .rename::<(), _, _>(&temp_key, final_key.as_ref())  | 
542  |  |             .await  | 
543  |  |             .err_tip(|| "While queueing key rename in RedisStore::update()")?;  | 
544  |  |  | 
545  |  |         // If we have a publish channel configured, send a notice that the key has been set.  | 
546  |  |         if let Some(pub_sub_channel) = &self.pub_sub_channel { | 
547  |  |             return Ok(client.publish(pub_sub_channel, final_key.as_ref()).await?);  | 
548  |  |         }  | 
549  |  |  | 
550  |  |         Ok(())  | 
551  | 8  |     }  | 
552  |  |  | 
553  |  |     async fn get_part(  | 
554  |  |         self: Pin<&Self>,  | 
555  |  |         key: StoreKey<'_>,  | 
556  |  |         writer: &mut DropCloserWriteHalf,  | 
557  |  |         offset: u64,  | 
558  |  |         length: Option<u64>,  | 
559  | 5  |     ) -> Result<(), Error> { | 
560  |  |         let offset = usize::try_from(offset).err_tip(|| "Could not convert offset to usize")?;  | 
561  |  |         let length = length  | 
562  | 4  |             .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize"))  | 
563  |  |             .transpose()?;  | 
564  |  |  | 
565  |  |         // To follow RBE spec we need to consider any digest's with  | 
566  |  |         // zero size to be existing.  | 
567  |  |         if is_zero_digest(key.borrow()) { | 
568  |  |             return writer  | 
569  |  |                 .send_eof()  | 
570  |  |                 .err_tip(|| "Failed to send zero EOF in redis store get_part");  | 
571  |  |         }  | 
572  |  |  | 
573  |  |         let client = self.get_client().await?;  | 
574  |  |         let encoded_key = self.encode_key(&key);  | 
575  |  |         let encoded_key = encoded_key.as_ref();  | 
576  |  |  | 
577  |  |         // N.B. the `-1`'s you see here are because redis GETRANGE is inclusive at both the start and end, so when we  | 
578  |  |         // do math with indices we change them to be exclusive at the end.  | 
579  |  |  | 
580  |  |         // We want to read the data at the key from `offset` to `offset + length`.  | 
581  |  |         let data_start = offset;  | 
582  |  |         let data_end = data_start  | 
583  |  |             .saturating_add(length.unwrap_or(isize::MAX as usize))  | 
584  |  |             .saturating_sub(1);  | 
585  |  |  | 
586  |  |         // And we don't ever want to read more than `read_chunk_size` bytes at a time, so we'll need to iterate.  | 
587  |  |         let mut chunk_start = data_start;  | 
588  |  |         let mut chunk_end = cmp::min(  | 
589  |  |             data_start.saturating_add(self.read_chunk_size) - 1,  | 
590  |  |             data_end,  | 
591  |  |         );  | 
592  |  |  | 
593  |  |         loop { | 
594  |  |             let chunk: Bytes = client  | 
595  |  |                 .getrange(encoded_key, chunk_start, chunk_end)  | 
596  |  |                 .await  | 
597  |  |                 .err_tip(|| "In RedisStore::get_part::getrange")?;  | 
598  |  |  | 
599  |  |             let didnt_receive_full_chunk = chunk.len() < self.read_chunk_size;  | 
600  |  |             let reached_end_of_data = chunk_end == data_end;  | 
601  |  |  | 
602  |  |             if didnt_receive_full_chunk || reached_end_of_data { | 
603  |  |                 if !chunk.is_empty() { | 
604  |  |                     writer  | 
605  |  |                         .send(chunk)  | 
606  |  |                         .await  | 
607  |  |                         .err_tip(|| "Failed to write data in RedisStore::get_part")?;  | 
608  |  |                 }  | 
609  |  |  | 
610  |  |                 break; // No more data to read.  | 
611  |  |             }  | 
612  |  |  | 
613  |  |             // We received a full chunk's worth of data, so write it...  | 
614  |  |             writer  | 
615  |  |                 .send(chunk)  | 
616  |  |                 .await  | 
617  |  |                 .err_tip(|| "Failed to write data in RedisStore::get_part")?;  | 
618  |  |  | 
619  |  |             // ...and go grab the next chunk.  | 
620  |  |             chunk_start = chunk_end + 1;  | 
621  |  |             chunk_end = cmp::min(  | 
622  |  |                 chunk_start.saturating_add(self.read_chunk_size) - 1,  | 
623  |  |                 data_end,  | 
624  |  |             );  | 
625  |  |         }  | 
626  |  |  | 
627  |  |         // If we didn't write any data, check if the key exists, if not return a NotFound error.  | 
628  |  |         // This is required by spec.  | 
629  |  |         if writer.get_bytes_written() == 0 { | 
630  |  |             // We're supposed to read 0 bytes, so just check if the key exists.  | 
631  |  |             let exists = client  | 
632  |  |                 .exists::<bool, _>(encoded_key)  | 
633  |  |                 .await  | 
634  |  |                 .err_tip(|| "In RedisStore::get_part::zero_exists")?;  | 
635  |  |  | 
636  |  |             if !exists { | 
637  |  |                 return Err(make_err!(  | 
638  |  |                     Code::NotFound,  | 
639  |  |                     "Data not found in Redis store for digest: {key:?}" | 
640  |  |                 ));  | 
641  |  |             }  | 
642  |  |         }  | 
643  |  |  | 
644  |  |         writer  | 
645  |  |             .send_eof()  | 
646  |  |             .err_tip(|| "Failed to write EOF in redis store get_part")  | 
647  | 5  |     }  | 
648  |  |  | 
649  | 0  |     fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { | 
650  | 0  |         self  | 
651  | 0  |     }  | 
652  |  |  | 
653  | 0  |     fn as_any(&self) -> &(dyn core::any::Any + Sync + Send) { | 
654  | 0  |         self  | 
655  | 0  |     }  | 
656  |  |  | 
657  | 0  |     fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send> { | 
658  | 0  |         self  | 
659  | 0  |     }  | 
660  |  |  | 
661  | 0  |     fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) { | 
662  | 0  |         registry.register_indicator(self);  | 
663  | 0  |     }  | 
664  |  |  | 
665  | 0  |     fn register_remove_callback(  | 
666  | 0  |         self: Arc<Self>,  | 
667  | 0  |         _callback: Arc<dyn RemoveItemCallback>,  | 
668  | 0  |     ) -> Result<(), Error> { | 
669  |  |         // As redis doesn't drop stuff, we can just ignore this  | 
670  | 0  |         Ok(())  | 
671  | 0  |     }  | 
672  |  | }  | 
673  |  |  | 
674  |  | #[async_trait]  | 
675  |  | impl HealthStatusIndicator for RedisStore { | 
676  | 1  |     fn get_name(&self) -> &'static str { | 
677  | 1  |         "RedisStore"  | 
678  | 1  |     }  | 
679  |  |  | 
680  | 0  |     async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { | 
681  |  |         StoreDriver::check_health(Pin::new(self), namespace).await  | 
682  | 0  |     }  | 
683  |  | }  | 
684  |  |  | 
685  |  | // -------------------------------------------------------------------  | 
686  |  | // Below this line are specific to the redis scheduler implementation.  | 
687  |  | // -------------------------------------------------------------------  | 
688  |  |  | 
689  |  | /// The maximum number of results to return per cursor.  | 
690  |  | const MAX_COUNT_PER_CURSOR: u64 = 256;  | 
691  |  | /// The time in milliseconds that a redis cursor can be idle before it is closed.  | 
692  |  | const CURSOR_IDLE_MS: u64 = 2_000;  | 
693  |  | /// The name of the field in the Redis hash that stores the data.  | 
694  |  | const DATA_FIELD_NAME: &str = "data";  | 
695  |  | /// The name of the field in the Redis hash that stores the version.  | 
696  |  | const VERSION_FIELD_NAME: &str = "version";  | 
697  |  | /// The time to live of indexes in seconds. After this time redis may delete the index.  | 
698  |  | const INDEX_TTL_S: u64 = 60 * 60 * 24; // 24 hours.  | 
699  |  |  | 
700  |  | /// Lua script to set a key if the version matches.  | 
701  |  | /// Args:  | 
702  |  | ///   KEYS[1]: The key where the version is stored.  | 
703  |  | ///   ARGV[1]: The expected version.  | 
704  |  | ///   ARGV[2]: The new data.  | 
705  |  | ///   ARGV[3*]: Key-value pairs of additional data to include.  | 
706  |  | /// Returns:  | 
707  |  | ///   The new version if the version matches. nil is returned if the  | 
708  |  | ///   value was not set.  | 
709  |  | const LUA_VERSION_SET_SCRIPT: &str = formatcp!(  | 
710  |  |     r"  | 
711  |  | local key = KEYS[1]  | 
712  |  | local expected_version = tonumber(ARGV[1])  | 
713  |  | local new_data = ARGV[2]  | 
714  |  | local new_version = redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', 1) | 
715  |  | local i  | 
716  |  | local indexes = {{}} | 
717  |  |  | 
718  |  | if new_version-1 ~= expected_version then  | 
719  |  |     redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', -1) | 
720  |  |     return {{ 0, new_version-1 }} | 
721  |  | end  | 
722  |  | -- Skip first 2 argvs, as they are known inputs.  | 
723  |  | -- Remember: Lua is 1-indexed.  | 
724  |  | for i=3, #ARGV do  | 
725  |  |     indexes[i-2] = ARGV[i]  | 
726  |  | end  | 
727  |  |  | 
728  |  | -- In testing we witnessed redis sometimes not update our FT indexes  | 
729  |  | -- resulting in stale data. It appears if we delete our keys then insert  | 
730  |  | -- them again it works and reduces risk significantly.  | 
731  |  | redis.call('DEL', key) | 
732  |  | redis.call('HSET', key, '{DATA_FIELD_NAME}', new_data, '{VERSION_FIELD_NAME}', new_version, unpack(indexes)) | 
733  |  |  | 
734  |  | return {{ 1, new_version }} | 
735  |  | "  | 
736  |  | );  | 
737  |  |  | 
738  |  | /// This is the output of the calculations below hardcoded into the executable.  | 
739  |  | const FINGERPRINT_CREATE_INDEX_HEX: &str = "3e762c15";  | 
740  |  |  | 
741  |  | #[cfg(test)]  | 
742  |  | mod test { | 
743  |  |     use super::FINGERPRINT_CREATE_INDEX_HEX;  | 
744  |  |  | 
745  |  |     /// String of the `FT.CREATE` command used to create the index template.  | 
746  |  |     const CREATE_INDEX_TEMPLATE: &str = "FT.CREATE {} ON HASH PREFIX 1 {} NOOFFSETS NOHL NOFIELDS NOFREQS SCHEMA {} TAG CASESENSITIVE SORTABLE"; | 
747  |  |  | 
748  |  |     /// Compile-time fingerprint of the `FT.CREATE` command used to create the  | 
749  |  |     /// index template. This is a simple CRC32 checksum of the command string.  | 
750  |  |     /// We don't care about it actually being a valid CRC32 checksum, just that  | 
751  |  |     /// it's a unique identifier with a low chance of collision.  | 
752  | 1  |     const fn fingerprint_create_index_template() -> u32 { | 
753  |  |         const POLY: u32 = 0xEDB8_8320;  | 
754  |  |         const DATA: &[u8] = CREATE_INDEX_TEMPLATE.as_bytes();  | 
755  | 1  |         let mut crc = 0xFFFF_FFFF;  | 
756  | 1  |         let mut i = 0;  | 
757  | 102  |         while i < DATA.len() {  Branch (757:15): [True: 101, False: 1]
  | 
758  | 101  |             let byte = DATA[i];  | 
759  | 101  |             crc ^= byte as u32;  | 
760  |  |  | 
761  | 101  |             let mut j = 0;  | 
762  | 909  |             while j < 8 {  Branch (762:19): [True: 808, False: 101]
  | 
763  | 808  |                 crc = if crc & 1 != 0 {  Branch (763:26): [True: 386, False: 422]
  | 
764  | 386  |                     (crc >> 1) ^ POLY  | 
765  |  |                 } else { | 
766  | 422  |                     crc >> 1  | 
767  |  |                 };  | 
768  | 808  |                 j += 1;  | 
769  |  |             }  | 
770  | 101  |             i += 1;  | 
771  |  |         }  | 
772  | 1  |         crc  | 
773  | 1  |     }  | 
774  |  |  | 
775  |  |     /// Verify that our calculation always evaluates to this fixed value.  | 
776  |  |     #[test]  | 
777  | 1  |     fn test_fingerprint_value() { | 
778  | 1  |         assert_eq!(  | 
779  | 1  |             format!("{:08x}", &fingerprint_create_index_template()), | 
780  |  |             FINGERPRINT_CREATE_INDEX_HEX,  | 
781  |  |         );  | 
782  | 1  |     }  | 
783  |  | }  | 
784  |  |  | 
785  |  | /// Get the name of the index to create for the given field.  | 
786  |  | /// This will add some prefix data to the name to try and ensure  | 
787  |  | /// if the index definition changes, the name will get a new name.  | 
788  |  | macro_rules! get_index_name { | 
789  |  |     ($prefix:expr, $field:expr, $maybe_sort:expr) => { | 
790  |  |         format_args!(  | 
791  |  |             "{}_{}_{}_{}", | 
792  |  |             $prefix,  | 
793  |  |             $field,  | 
794  |  |             $maybe_sort.unwrap_or(""), | 
795  |  |             FINGERPRINT_CREATE_INDEX_HEX  | 
796  |  |         )  | 
797  |  |     };  | 
798  |  | }  | 
799  |  |  | 
800  |  | /// Try to sanitize a string to be used as a Redis key.  | 
801  |  | /// We don't actually modify the string, just check if it's valid.  | 
802  | 22  | const fn try_sanitize(s: &str) -> Option<&str> { | 
803  |  |     // Note: We cannot use for loops or iterators here because they are not const.  | 
804  |  |     // Allowing us to use a const function here gives the compiler the ability to  | 
805  |  |     // optimize this function away entirely in the case where the input is constant.  | 
806  | 22  |     let chars = s.as_bytes();  | 
807  | 22  |     let mut i: usize = 0;  | 
808  | 22  |     let len = s.len();  | 
809  |  |     loop { | 
810  | 569  |         if i >= len {  Branch (810:12): [True: 22, False: 547]
   Branch (810:12): [Folded - Ignored]
  | 
811  | 22  |             break;  | 
812  | 547  |         }  | 
813  | 547  |         let c = chars[i];  | 
814  | 547  |         if !c.is_ascii_alphanumeric() && c != b'_'25  {   Branch (814:12): [True: 25, False: 522]
  Branch (814:42): [True: 0, False: 25]
   Branch (814:12): [Folded - Ignored]
  Branch (814:42): [Folded - Ignored]
  | 
815  | 0  |             return None;  | 
816  | 547  |         }  | 
817  | 547  |         i += 1;  | 
818  |  |     }  | 
819  | 22  |     Some(s)  | 
820  | 22  | }  | 
821  |  |  | 
822  |  | /// An individual subscription to a key in Redis.  | 
823  |  | #[derive(Debug)]  | 
824  |  | pub struct RedisSubscription { | 
825  |  |     receiver: Option<tokio::sync::watch::Receiver<String>>,  | 
826  |  |     weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,  | 
827  |  | }  | 
828  |  |  | 
829  |  | impl SchedulerSubscription for RedisSubscription { | 
830  |  |     /// Wait for the subscription key to change.  | 
831  | 25  |     async fn changed(&mut self) -> Result<(), Error> { | 
832  | 25  |         let receiver = self  | 
833  | 25  |             .receiver  | 
834  | 25  |             .as_mut()  | 
835  | 25  |             .ok_or_else(|| make_err!(Code::Internal0 , "In RedisSubscription::changed::as_mut"))?0 ;  | 
836  | 25  |         receiver  | 
837  | 25  |             .changed()  | 
838  | 25  |             .await  | 
839  | 21  |             .map_err(|_| make_err!(Code::Internal0 , "In RedisSubscription::changed::changed"))  | 
840  | 21  |     }  | 
841  |  | }  | 
842  |  |  | 
843  |  | // If the subscription is dropped, we need to possibly remove the key from the  | 
844  |  | // subscribed keys map.  | 
845  |  | impl Drop for RedisSubscription { | 
846  | 6  |     fn drop(&mut self) { | 
847  | 6  |         let Some(receiver) = self.receiver.take() else {  Branch (847:13): [True: 6, False: 0]
   Branch (847:13): [Folded - Ignored]
  | 
848  | 0  |             warn!("RedisSubscription has already been dropped, nothing to do."); | 
849  | 0  |             return; // Already dropped, nothing to do.  | 
850  |  |         };  | 
851  | 6  |         let key = receiver.borrow().clone();  | 
852  |  |         // IMPORTANT: This must be dropped before receiver_count() is called.  | 
853  | 6  |         drop(receiver);  | 
854  | 6  |         let Some(subscribed_keys4 ) = self.weak_subscribed_keys.upgrade() else {   Branch (854:13): [True: 4, False: 2]
   Branch (854:13): [Folded - Ignored]
  | 
855  | 2  |             return; // Already dropped, nothing to do.  | 
856  |  |         };  | 
857  | 4  |         let mut subscribed_keys = subscribed_keys.write();  | 
858  | 4  |         let Some(value) = subscribed_keys.get(&key) else {  Branch (858:13): [True: 4, False: 0]
   Branch (858:13): [Folded - Ignored]
  | 
859  | 0  |             error!(  | 
860  | 0  |                 "Key {key} was not found in subscribed keys when checking if it should be removed." | 
861  |  |             );  | 
862  | 0  |             return;  | 
863  |  |         };  | 
864  |  |         // If we have no receivers, cleanup the entry from our map.  | 
865  | 4  |         if value.receiver_count() == 0 {  Branch (865:12): [True: 4, False: 0]
   Branch (865:12): [Folded - Ignored]
  | 
866  | 4  |             subscribed_keys.remove(key);  | 
867  | 4  |         }0   | 
868  | 6  |     }  | 
869  |  | }  | 
870  |  |  | 
871  |  | /// A publisher for a key in Redis.  | 
872  |  | #[derive(Debug)]  | 
873  |  | struct RedisSubscriptionPublisher { | 
874  |  |     sender: Mutex<tokio::sync::watch::Sender<String>>,  | 
875  |  | }  | 
876  |  |  | 
877  |  | impl RedisSubscriptionPublisher { | 
878  | 6  |     fn new(  | 
879  | 6  |         key: String,  | 
880  | 6  |         weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,  | 
881  | 6  |     ) -> (Self, RedisSubscription) { | 
882  | 6  |         let (sender, receiver) = tokio::sync::watch::channel(key);  | 
883  | 6  |         let publisher = Self { | 
884  | 6  |             sender: Mutex::new(sender),  | 
885  | 6  |         };  | 
886  | 6  |         let subscription = RedisSubscription { | 
887  | 6  |             receiver: Some(receiver),  | 
888  | 6  |             weak_subscribed_keys,  | 
889  | 6  |         };  | 
890  | 6  |         (publisher, subscription)  | 
891  | 6  |     }  | 
892  |  |  | 
893  | 0  |     fn subscribe(  | 
894  | 0  |         &self,  | 
895  | 0  |         weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,  | 
896  | 0  |     ) -> RedisSubscription { | 
897  | 0  |         let receiver = self.sender.lock().subscribe();  | 
898  | 0  |         RedisSubscription { | 
899  | 0  |             receiver: Some(receiver),  | 
900  | 0  |             weak_subscribed_keys,  | 
901  | 0  |         }  | 
902  | 0  |     }  | 
903  |  |  | 
904  | 4  |     fn receiver_count(&self) -> usize { | 
905  | 4  |         self.sender.lock().receiver_count()  | 
906  | 4  |     }  | 
907  |  |  | 
908  | 22  |     fn notify(&self) { | 
909  |  |         // TODO(https://github.com/sile/patricia_tree/issues/40) When this is addressed  | 
910  |  |         // we can remove the `Mutex` and use the mutable iterator directly.  | 
911  | 22  |         self.sender.lock().send_modify(|_| {}); | 
912  | 22  |     }  | 
913  |  | }  | 
914  |  |  | 
915  |  | #[derive(Debug)]  | 
916  |  | pub struct RedisSubscriptionManager { | 
917  |  |     subscribed_keys: Arc<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,  | 
918  |  |     tx_for_test: tokio::sync::mpsc::UnboundedSender<String>,  | 
919  |  |     _subscription_spawn: JoinHandleDropGuard<()>,  | 
920  |  | }  | 
921  |  |  | 
922  |  | impl RedisSubscriptionManager { | 
923  | 4  |     pub fn new(subscribe_client: SubscriberClient, pub_sub_channel: String) -> Self { | 
924  | 4  |         let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new()));  | 
925  | 4  |         let subscribed_keys_weak = Arc::downgrade(&subscribed_keys);  | 
926  | 4  |         let (tx_for_test, mut rx_for_test) = tokio::sync::mpsc::unbounded_channel();  | 
927  |  |         Self { | 
928  | 4  |             subscribed_keys,  | 
929  | 4  |             tx_for_test,  | 
930  | 4  |             _subscription_spawn: spawn!("redis_subscribe_spawn", async move { | 
931  | 4  |                 let mut rx = subscribe_client.message_rx();  | 
932  |  |                 loop { | 
933  | 4  |                     if let Err(e0 ) = subscribe_client.subscribe(&pub_sub_channel).await {   Branch (933:28): [True: 0, False: 4]
   Branch (933:28): [Folded - Ignored]
  | 
934  | 0  |                         error!("Error subscribing to pattern - {e}"); | 
935  | 0  |                         return;  | 
936  | 4  |                     }  | 
937  | 4  |                     let mut reconnect_rx = subscribe_client.reconnect_rx();  | 
938  | 4  |                     let reconnect_fut = reconnect_rx.recv().fuse();  | 
939  | 4  |                     tokio::pin!(reconnect_fut);  | 
940  |  |                     loop { | 
941  | 21  |                         let key17  = select! {  | 
942  | 21  |                             value17  = rx_for_test.recv() => {  | 
943  | 17  |                                 let Some(value) = value else {  Branch (943:37): [True: 17, False: 0]
   Branch (943:37): [Folded - Ignored]
  | 
944  | 0  |                                     unreachable!("Channel should never close"); | 
945  |  |                                 };  | 
946  | 17  |                                 value.into()  | 
947  |  |                             },  | 
948  | 21  |                             msg0  = rx.recv() => {  | 
949  | 0  |                                 match msg { | 
950  | 0  |                                     Ok(msg) => { | 
951  | 0  |                                         if let RedisValue::String(s) = msg.value {  Branch (951:48): [True: 0, False: 0]
   Branch (951:48): [Folded - Ignored]
  | 
952  | 0  |                                             s  | 
953  |  |                                         } else { | 
954  | 0  |                                             error!("Received non-string message in RedisSubscriptionManager"); | 
955  | 0  |                                             continue;  | 
956  |  |                                         }  | 
957  |  |                                     },  | 
958  | 0  |                                     Err(e) => { | 
959  |  |                                         // Check to see if our parent has been dropped and if so kill spawn.  | 
960  | 0  |                                         if subscribed_keys_weak.upgrade().is_none() {  Branch (960:44): [True: 0, False: 0]
   Branch (960:44): [Folded - Ignored]
  | 
961  | 0  |                                             warn!("It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"); | 
962  | 0  |                                             return;  | 
963  | 0  |                                         }  | 
964  | 0  |                                         error!("Error receiving message in RedisSubscriptionManager reconnecting and flagging everything changed - {e}"); | 
965  | 0  |                                         break;  | 
966  |  |                                     }  | 
967  |  |                                 }  | 
968  |  |                             },  | 
969  | 21  |                             _ = &mut reconnect_fut => { | 
970  | 0  |                                 warn!("Redis reconnected flagging all subscriptions as changed and resuming"); | 
971  | 0  |                                 break;  | 
972  |  |                             }  | 
973  |  |                         };  | 
974  | 17  |                         let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {  Branch (974:29): [True: 17, False: 0]
   Branch (974:29): [Folded - Ignored]
  | 
975  | 0  |                             warn!(  | 
976  | 0  |                                 "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"  | 
977  |  |                             );  | 
978  | 0  |                             return;  | 
979  |  |                         };  | 
980  | 17  |                         let subscribed_keys_mux = subscribed_keys.read();  | 
981  | 17  |                         subscribed_keys_mux  | 
982  | 17  |                             .common_prefix_values(&*key)  | 
983  | 17  |                             .for_each(RedisSubscriptionPublisher::notify);  | 
984  |  |                     }  | 
985  |  |                     // Sleep for a small amount of time to ensure we don't reconnect too quickly.  | 
986  | 0  |                     sleep(Duration::from_secs(1)).await;  | 
987  |  |                     // If we reconnect or lag behind we might have had dirty keys, so we need to  | 
988  |  |                     // flag all of them as changed.  | 
989  | 0  |                     let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {  Branch (989:25): [True: 0, False: 0]
   Branch (989:25): [Folded - Ignored]
  | 
990  | 0  |                         warn!(  | 
991  | 0  |                             "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"  | 
992  |  |                         );  | 
993  | 0  |                         return;  | 
994  |  |                     };  | 
995  | 0  |                     let subscribed_keys_mux = subscribed_keys.read();  | 
996  |  |                     // Just in case also get a new receiver.  | 
997  | 0  |                     rx = subscribe_client.message_rx();  | 
998  |  |                     // Drop all buffered messages, then flag everything as changed.  | 
999  | 0  |                     rx.resubscribe();  | 
1000  | 0  |                     for publisher in subscribed_keys_mux.values() { | 
1001  | 0  |                         publisher.notify();  | 
1002  | 0  |                     }  | 
1003  |  |                 }  | 
1004  | 0  |             }),  | 
1005  |  |         }  | 
1006  | 4  |     }  | 
1007  |  | }  | 
1008  |  |  | 
1009  |  | impl SchedulerSubscriptionManager for RedisSubscriptionManager { | 
1010  |  |     type Subscription = RedisSubscription;  | 
1011  |  |  | 
1012  | 17  |     fn notify_for_test(&self, value: String) { | 
1013  | 17  |         self.tx_for_test.send(value).unwrap();  | 
1014  | 17  |     }  | 
1015  |  |  | 
1016  | 6  |     fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>  | 
1017  | 6  |     where  | 
1018  | 6  |         K: SchedulerStoreKeyProvider,  | 
1019  |  |     { | 
1020  | 6  |         let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys);  | 
1021  | 6  |         let mut subscribed_keys = self.subscribed_keys.write();  | 
1022  | 6  |         let key = key.get_key();  | 
1023  | 6  |         let key_str = key.as_str();  | 
1024  | 6  |         let mut subscription = if let Some(publisher0 ) = subscribed_keys.get(&key_str) {   Branch (1024:39): [True: 0, False: 0]
   Branch (1024:39): [Folded - Ignored]
   Branch (1024:39): [Folded - Ignored]
   Branch (1024:39): [True: 0, False: 6]
  | 
1025  | 0  |             publisher.subscribe(weak_subscribed_keys)  | 
1026  |  |         } else { | 
1027  | 6  |             let (publisher, subscription) =  | 
1028  | 6  |                 RedisSubscriptionPublisher::new(key_str.to_string(), weak_subscribed_keys);  | 
1029  | 6  |             subscribed_keys.insert(key_str, publisher);  | 
1030  | 6  |             subscription  | 
1031  |  |         };  | 
1032  | 6  |         subscription  | 
1033  | 6  |             .receiver  | 
1034  | 6  |             .as_mut()  | 
1035  | 6  |             .ok_or_else(|| {0   | 
1036  | 0  |                 make_err!(  | 
1037  | 0  |                     Code::Internal,  | 
1038  |  |                     "Receiver should be set in RedisSubscriptionManager::subscribe"  | 
1039  |  |                 )  | 
1040  | 0  |             })?  | 
1041  | 6  |             .mark_changed();  | 
1042  |  |  | 
1043  | 6  |         Ok(subscription)  | 
1044  | 6  |     }  | 
1045  |  |  | 
1046  | 2  |     fn is_reliable() -> bool { | 
1047  | 2  |         false  | 
1048  | 2  |     }  | 
1049  |  | }  | 
1050  |  |  | 
1051  |  | impl SchedulerStore for RedisStore { | 
1052  |  |     type SubscriptionManager = RedisSubscriptionManager;  | 
1053  |  |  | 
1054  | 9  |     fn subscription_manager(&self) -> Result<Arc<RedisSubscriptionManager>, Error> { | 
1055  | 9  |         let mut subscription_manager = self.subscription_manager.lock();  | 
1056  |  |  | 
1057  | 9  |         if let Some(subscription_manager5 ) = &*subscription_manager {   Branch (1057:16): [True: 5, False: 4]
   Branch (1057:16): [Folded - Ignored]
  | 
1058  | 5  |             Ok(subscription_manager.clone())  | 
1059  |  |         } else { | 
1060  | 4  |             let Some(pub_sub_channel) = &self.pub_sub_channel else {  Branch (1060:17): [True: 4, False: 0]
   Branch (1060:17): [Folded - Ignored]
  | 
1061  | 0  |                 return Err(make_input_err!(  | 
1062  | 0  |                     "RedisStore must have a pubsub channel for a Redis Scheduler if using subscriptions"  | 
1063  | 0  |                 ));  | 
1064  |  |             };  | 
1065  | 4  |             let sub = Arc::new(RedisSubscriptionManager::new(  | 
1066  | 4  |                 self.subscriber_client.clone(),  | 
1067  | 4  |                 pub_sub_channel.clone(),  | 
1068  |  |             ));  | 
1069  | 4  |             *subscription_manager = Some(sub.clone());  | 
1070  | 4  |             Ok(sub)  | 
1071  |  |         }  | 
1072  | 9  |     }  | 
1073  |  |  | 
1074  | 19  |     async fn update_data<T>(&self, data: T) -> Result<Option<i64>, Error>  | 
1075  | 19  |     where  | 
1076  | 19  |         T: SchedulerStoreDataProvider  | 
1077  | 19  |             + SchedulerStoreKeyProvider  | 
1078  | 19  |             + SchedulerCurrentVersionProvider  | 
1079  | 19  |             + Send,  | 
1080  | 19  |     { | 
1081  | 19  |         let key = data.get_key();  | 
1082  | 19  |         let key = self.encode_key(&key);  | 
1083  | 19  |         let client = self.get_client().await?0 ;  | 
1084  | 19  |         let maybe_index = data.get_indexes().err_tip(|| {0   | 
1085  | 0  |             format!("Err getting index in RedisStore::update_data::versioned for {key:?}") | 
1086  | 0  |         })?;  | 
1087  | 19  |         if <T as SchedulerStoreKeyProvider>::Versioned::VALUE {  Branch (1087:12): [True: 0, False: 0]
   Branch (1087:12): [True: 0, False: 0]
   Branch (1087:12): [Folded - Ignored]
   Branch (1087:12): [Folded - Ignored]
   Branch (1087:12): [True: 0, False: 4]
   Branch (1087:12): [True: 15, False: 0]
  | 
1088  | 15  |             let current_version = data.current_version();  | 
1089  | 15  |             let data = data.try_into_bytes().err_tip(|| {0   | 
1090  | 0  |                 format!("Could not convert value to bytes in RedisStore::update_data::versioned for {key:?}") | 
1091  | 0  |             })?;  | 
1092  | 15  |             let mut argv = Vec::with_capacity(3 + maybe_index.len() * 2);  | 
1093  | 15  |             argv.push(Bytes::from(format!("{current_version}"))); | 
1094  | 15  |             argv.push(data);  | 
1095  | 60  |             for (name45 , value45 ) in maybe_index {  | 
1096  | 45  |                 argv.push(Bytes::from_static(name.as_bytes()));  | 
1097  | 45  |                 argv.push(value);  | 
1098  | 45  |             }  | 
1099  | 15  |             let (success, new_version): (bool, i64) = self  | 
1100  | 15  |                 .update_if_version_matches_script  | 
1101  | 15  |                 .evalsha_with_reload(client, vec![key.as_ref()], argv)  | 
1102  | 15  |                 .await  | 
1103  | 15  |                 .err_tip(|| format!("In RedisStore::update_data::versioned for {key:?}"0 ))?0 ;  | 
1104  | 15  |             if !success {  Branch (1104:16): [True: 0, False: 0]
   Branch (1104:16): [True: 0, False: 0]
   Branch (1104:16): [Folded - Ignored]
   Branch (1104:16): [Folded - Ignored]
   Branch (1104:16): [True: 0, False: 0]
   Branch (1104:16): [True: 1, False: 14]
  | 
1105  | 1  |                 tracing::info!(  | 
1106  | 1  |                     "Error updating Redis key {key} expected version {current_version} but found {new_version}" | 
1107  |  |                 );  | 
1108  | 1  |                 return Ok(None);  | 
1109  | 14  |             }  | 
1110  |  |             // If we have a publish channel configured, send a notice that the key has been set.  | 
1111  | 14  |             if let Some(pub_sub_channel) = &self.pub_sub_channel {  Branch (1111:20): [True: 0, False: 0]
   Branch (1111:20): [True: 0, False: 0]
   Branch (1111:20): [Folded - Ignored]
   Branch (1111:20): [Folded - Ignored]
   Branch (1111:20): [True: 0, False: 0]
   Branch (1111:20): [True: 14, False: 0]
  | 
1112  | 14  |                 return Ok(client.publish(pub_sub_channel, key.as_ref()).await?0 );  | 
1113  | 0  |             }  | 
1114  | 0  |             Ok(Some(new_version))  | 
1115  |  |         } else { | 
1116  | 4  |             let data = data.try_into_bytes().err_tip(|| {0   | 
1117  | 0  |                 format!("Could not convert value to bytes in RedisStore::update_data::noversion for {key:?}") | 
1118  | 0  |             })?;  | 
1119  | 4  |             let mut fields = RedisMap::new();  | 
1120  | 4  |             fields.reserve(1 + maybe_index.len());  | 
1121  | 4  |             fields.insert(DATA_FIELD_NAME.into(), data.into());  | 
1122  | 4  |             for (name0 , value0 ) in maybe_index {  | 
1123  | 0  |                 fields.insert(name.into(), value.into());  | 
1124  | 0  |             }  | 
1125  | 4  |             client  | 
1126  | 4  |                 .hset::<(), _, _>(key.as_ref(), fields)  | 
1127  | 4  |                 .await  | 
1128  | 4  |                 .err_tip(|| format!("In RedisStore::update_data::noversion for {key:?}"0 ))?0 ;  | 
1129  |  |             // If we have a publish channel configured, send a notice that the key has been set.  | 
1130  | 4  |             if let Some(pub_sub_channel) = &self.pub_sub_channel {  Branch (1130:20): [True: 0, False: 0]
   Branch (1130:20): [True: 0, False: 0]
   Branch (1130:20): [Folded - Ignored]
   Branch (1130:20): [Folded - Ignored]
   Branch (1130:20): [True: 4, False: 0]
   Branch (1130:20): [True: 0, False: 0]
  | 
1131  | 4  |                 return Ok(client.publish(pub_sub_channel, key.as_ref()).await?0 );  | 
1132  | 0  |             }  | 
1133  | 0  |             Ok(Some(0)) // Always use "0" version since this is not a versioned request.  | 
1134  |  |         }  | 
1135  | 19  |     }  | 
1136  |  |  | 
1137  | 21  |     async fn search_by_index_prefix<K>(  | 
1138  | 21  |         &self,  | 
1139  | 21  |         index: K,  | 
1140  | 21  |     ) -> Result<  | 
1141  | 21  |         impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,  | 
1142  | 21  |         Error,  | 
1143  | 21  |     >  | 
1144  | 21  |     where  | 
1145  | 21  |         K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,  | 
1146  | 21  |     { | 
1147  | 21  |         let index_value = index.index_value();  | 
1148  | 22  |         let run_ft_aggregate21  = || {  | 
1149  | 22  |             let client = self.client_pool.next().clone();  | 
1150  | 22  |             let sanitized_field = try_sanitize(index_value.as_ref()).err_tip(|| {0   | 
1151  | 0  |                 format!("In RedisStore::search_by_index_prefix::try_sanitize - {index_value:?}") | 
1152  | 0  |             })?;  | 
1153  | 22  |             Ok::<_, Error>(async move { | 
1154  | 22  |                 ft_aggregate(  | 
1155  | 22  |                     client,  | 
1156  | 22  |                     format!(  | 
1157  | 22  |                         "{}", | 
1158  | 22  |                         get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)  | 
1159  |  |                     ),  | 
1160  | 22  |                     format!("@{}:{{ {} }}", K::INDEX_NAME, sanitized_field), | 
1161  |  |                     FtAggregateOptions { | 
1162  | 22  |                         load: Some(Load::Some(vec![  | 
1163  | 22  |                             SearchField { | 
1164  | 22  |                                 identifier: DATA_FIELD_NAME.into(),  | 
1165  | 22  |                                 property: None,  | 
1166  | 22  |                             },  | 
1167  | 22  |                             SearchField { | 
1168  | 22  |                                 identifier: VERSION_FIELD_NAME.into(),  | 
1169  | 22  |                                 property: None,  | 
1170  | 22  |                             },  | 
1171  | 22  |                         ])),  | 
1172  | 22  |                         cursor: Some(WithCursor { | 
1173  | 22  |                             count: Some(MAX_COUNT_PER_CURSOR),  | 
1174  | 22  |                             max_idle: Some(CURSOR_IDLE_MS),  | 
1175  | 22  |                         }),  | 
1176  | 22  |                         pipeline: vec![AggregateOperation::SortBy { | 
1177  | 22  |                             properties: K::MAYBE_SORT_KEY.map_or_else(Vec::new, |v| {17   | 
1178  | 17  |                                 vec![(format!("@{v}").into(), SortOrder::Asc)] | 
1179  | 17  |                             }),  | 
1180  | 22  |                             max: None,  | 
1181  |  |                         }],  | 
1182  | 22  |                         ..Default::default()  | 
1183  |  |                     },  | 
1184  |  |                 )  | 
1185  | 22  |                 .await  | 
1186  | 22  |             })  | 
1187  | 22  |         };  | 
1188  | 21  |         let stream = run_ft_aggregate()?0   | 
1189  | 21  |             .or_else(|_| async move {1   | 
1190  | 1  |                 let mut schema = vec![SearchSchema { | 
1191  | 1  |                     field_name: K::INDEX_NAME.into(),  | 
1192  | 1  |                     alias: None,  | 
1193  | 1  |                     kind: SearchSchemaKind::Tag { | 
1194  | 1  |                         sortable: false,  | 
1195  | 1  |                         unf: false,  | 
1196  | 1  |                         separator: None,  | 
1197  | 1  |                         casesensitive: false,  | 
1198  | 1  |                         withsuffixtrie: false,  | 
1199  | 1  |                         noindex: false,  | 
1200  | 1  |                     },  | 
1201  | 1  |                 }];  | 
1202  | 1  |                 if let Some(sort_key0 ) = K::MAYBE_SORT_KEY {   Branch (1202:24): [True: 0, False: 0]
  Branch (1202:24): [True: 0, False: 0]
   Branch (1202:24): [Folded - Ignored]
   Branch (1202:24): [Folded - Ignored]
   Branch (1202:24): [True: 0, False: 0]
  Branch (1202:24): [True: 0, False: 1]
  | 
1203  | 0  |                     schema.push(SearchSchema { | 
1204  | 0  |                         field_name: sort_key.into(),  | 
1205  | 0  |                         alias: None,  | 
1206  | 0  |                         kind: SearchSchemaKind::Tag { | 
1207  | 0  |                             sortable: true,  | 
1208  | 0  |                             unf: false,  | 
1209  | 0  |                             separator: None,  | 
1210  | 0  |                             casesensitive: false,  | 
1211  | 0  |                             withsuffixtrie: false,  | 
1212  | 0  |                             noindex: false,  | 
1213  | 0  |                         },  | 
1214  | 0  |                     });  | 
1215  | 1  |                 }  | 
1216  | 1  |                 let create_result = self  | 
1217  | 1  |                     .client_pool  | 
1218  | 1  |                     .next()  | 
1219  | 1  |                     .ft_create::<(), _>(  | 
1220  | 1  |                         format!(  | 
1221  | 1  |                             "{}", | 
1222  | 1  |                             get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)  | 
1223  | 1  |                         ),  | 
1224  | 1  |                         FtCreateOptions { | 
1225  | 1  |                             on: Some(IndexKind::Hash),  | 
1226  | 1  |                             prefixes: vec![K::KEY_PREFIX.into()],  | 
1227  | 1  |                             nohl: true,  | 
1228  | 1  |                             nofields: true,  | 
1229  | 1  |                             nofreqs: true,  | 
1230  | 1  |                             nooffsets: true,  | 
1231  | 1  |                             temporary: Some(INDEX_TTL_S),  | 
1232  | 1  |                             ..Default::default()  | 
1233  | 1  |                         },  | 
1234  | 1  |                         schema,  | 
1235  | 1  |                     )  | 
1236  | 1  |                     .await  | 
1237  | 1  |                     .err_tip(|| {0   | 
1238  | 0  |                         format!(  | 
1239  | 0  |                             "Error with ft_create in RedisStore::search_by_index_prefix({})", | 
1240  | 0  |                             get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),  | 
1241  |  |                         )  | 
1242  | 0  |                     });  | 
1243  | 1  |                 let run_result = run_ft_aggregate()?0 .await.err_tip(|| {0   | 
1244  | 0  |                     format!(  | 
1245  | 0  |                         "Error with second ft_aggregate in RedisStore::search_by_index_prefix({})", | 
1246  | 0  |                         get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY),  | 
1247  |  |                     )  | 
1248  | 0  |                 });  | 
1249  |  |                 // Creating the index will race which is ok. If it fails to create, we only  | 
1250  |  |                 // error if the second ft_aggregate call fails and fails to create.  | 
1251  | 1  |                 run_result.or_else(move |e| create_result0 .merge0 (Err(e)0 ))  | 
1252  | 2  |             })  | 
1253  | 21  |             .await?0 ;  | 
1254  | 21  |         Ok(stream.map(|result| {7   | 
1255  | 7  |             let mut redis_map =  | 
1256  | 7  |                 result.err_tip(|| "Error in stream of in RedisStore::search_by_index_prefix")?0 ;  | 
1257  | 7  |             let bytes_data = redis_map  | 
1258  | 7  |                 .remove(&RedisKey::from_static_str(DATA_FIELD_NAME))  | 
1259  | 7  |                 .err_tip(|| "Missing data field in RedisStore::search_by_index_prefix")?0   | 
1260  | 7  |                 .into_bytes()  | 
1261  | 7  |                 .err_tip(|| {0   | 
1262  | 0  |                     formatcp!("'{DATA_FIELD_NAME}' is not Bytes in RedisStore::search_by_index_prefix::into_bytes") | 
1263  | 0  |                 })?;  | 
1264  | 7  |             let version = if <K as SchedulerIndexProvider>::Versioned::VALUE {  Branch (1264:30): [True: 0, False: 0]
  Branch (1264:30): [True: 0, False: 0]
   Branch (1264:30): [Folded - Ignored]
   Branch (1264:30): [Folded - Ignored]
   Branch (1264:30): [True: 5, False: 0]
  Branch (1264:30): [True: 2, False: 0]
  | 
1265  | 7  |                 redis_map  | 
1266  | 7  |                     .remove(&RedisKey::from_static_str(VERSION_FIELD_NAME))  | 
1267  | 7  |                     .err_tip(|| "Missing version field in RedisStore::search_by_index_prefix")?0   | 
1268  | 7  |                     .as_i64()  | 
1269  | 7  |                     .err_tip(|| {0   | 
1270  | 0  |                         formatcp!("'{VERSION_FIELD_NAME}' is not u64 in RedisStore::search_by_index_prefix::as_u64") | 
1271  | 0  |                     })?  | 
1272  |  |             } else { | 
1273  | 0  |                 0  | 
1274  |  |             };  | 
1275  | 7  |             K::decode(version, bytes_data)  | 
1276  | 7  |                 .err_tip(|| "In RedisStore::search_by_index_prefix::decode")  | 
1277  | 7  |         }))  | 
1278  | 21  |     }  | 
1279  |  |  | 
1280  | 38  |     async fn get_and_decode<K>(  | 
1281  | 38  |         &self,  | 
1282  | 38  |         key: K,  | 
1283  | 38  |     ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>  | 
1284  | 38  |     where  | 
1285  | 38  |         K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send,  | 
1286  | 38  |     { | 
1287  | 38  |         let key = key.get_key();  | 
1288  | 38  |         let key = self.encode_key(&key);  | 
1289  | 38  |         let client = self.get_client().await?0 ;  | 
1290  | 38  |         let (maybe_version37 , maybe_data37 ) = client  | 
1291  | 38  |             .hmget::<(Option<i64>, Option<Bytes>), _, _>(  | 
1292  | 38  |                 key.as_ref(),  | 
1293  | 38  |                 vec![  | 
1294  | 38  |                     RedisKey::from(VERSION_FIELD_NAME),  | 
1295  | 38  |                     RedisKey::from(DATA_FIELD_NAME),  | 
1296  | 38  |                 ],  | 
1297  | 38  |             )  | 
1298  | 38  |             .await  | 
1299  | 38  |             .err_tip(|| format!("In RedisStore::get_without_version::notversioned {key}"1 ))?1 ;  | 
1300  | 37  |         let Some(data) = maybe_data else {  Branch (1300:13): [True: 0, False: 0]
   Branch (1300:13): [True: 0, False: 0]
   Branch (1300:13): [Folded - Ignored]
   Branch (1300:13): [Folded - Ignored]
   Branch (1300:13): [True: 3, False: 0]
   Branch (1300:13): [True: 34, False: 0]
  | 
1301  | 0  |             return Ok(None);  | 
1302  |  |         };  | 
1303  | 37  |         Ok(Some(K::decode(maybe_version.unwrap_or(0), data).err_tip(  | 
1304  | 0  |             || format!("In RedisStore::get_with_version::notversioned::decode {key}"), | 
1305  | 0  |         )?))  | 
1306  | 38  |     }  | 
1307  |  | }  |