/build/source/nativelink-store/src/redis_store.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::borrow::Cow; |
16 | | use std::cmp; |
17 | | use std::pin::Pin; |
18 | | use std::sync::{Arc, Weak}; |
19 | | use std::time::Duration; |
20 | | |
21 | | use async_trait::async_trait; |
22 | | use bytes::Bytes; |
23 | | use const_format::formatcp; |
24 | | use fred::clients::{RedisClient, RedisPool, SubscriberClient}; |
25 | | use fred::interfaces::{ClientLike, KeysInterface, PubsubInterface}; |
26 | | use fred::prelude::{EventInterface, HashesInterface, RediSearchInterface}; |
27 | | use fred::types::{ |
28 | | Builder, ConnectionConfig, FtCreateOptions, PerformanceConfig, ReconnectPolicy, RedisConfig, |
29 | | RedisKey, RedisMap, RedisValue, Script, SearchSchema, SearchSchemaKind, |
30 | | }; |
31 | | use futures::{FutureExt, Stream, StreamExt}; |
32 | | use nativelink_config::stores::RedisMode; |
33 | | use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; |
34 | | use nativelink_metric::MetricsComponent; |
35 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
36 | | use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; |
37 | | use nativelink_util::spawn; |
38 | | use nativelink_util::store_trait::{ |
39 | | BoolValue, SchedulerCurrentVersionProvider, SchedulerIndexProvider, SchedulerStore, |
40 | | SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider, |
41 | | SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo, |
42 | | }; |
43 | | use nativelink_util::task::JoinHandleDropGuard; |
44 | | use parking_lot::{Mutex, RwLock}; |
45 | | use patricia_tree::StringPatriciaMap; |
46 | | use tokio::select; |
47 | | use tokio::time::sleep; |
48 | | use tracing::{event, Level}; |
49 | | use uuid::Uuid; |
50 | | |
51 | | use crate::cas_utils::is_zero_digest; |
52 | | use crate::redis_utils::ft_aggregate; |
53 | | |
54 | | // TODO(caass): These (and other settings) should be made configurable via nativelink-config. |
55 | | pub const READ_CHUNK_SIZE: usize = 64 * 1024; |
56 | | const CONNECTION_POOL_SIZE: usize = 3; |
57 | | |
58 | | #[allow(clippy::trivially_copy_pass_by_ref)] |
59 | 1 | fn to_hex(value: &u32) -> String { |
60 | 1 | format!("{value:08x}") |
61 | 1 | } |
62 | | |
63 | | /// A [`StoreDriver`] implementation that uses Redis as a backing store. |
64 | 1 | #[derive(MetricsComponent)] |
65 | | pub struct RedisStore { |
66 | | /// The client pool connecting to the backing Redis instance(s). |
67 | | client_pool: RedisPool, |
68 | | |
69 | | /// A channel to publish updates to when a key is added, removed, or modified. |
70 | | #[metric( |
71 | | help = "The pubsub channel to publish updates to when a key is added, removed, or modified" |
72 | | )] |
73 | | pub_sub_channel: Option<String>, |
74 | | |
75 | | /// A redis client for managing subscriptions. |
76 | | /// TODO: This should be moved into the store in followups once a standard use pattern has been determined. |
77 | | subscriber_client: SubscriberClient, |
78 | | |
79 | | /// For metrics only. |
80 | | #[metric( |
81 | | help = "A unique identifier for the FT.CREATE command used to create the index template", |
82 | | handler = to_hex |
83 | | )] |
84 | | fingerprint_create_index: u32, |
85 | | |
86 | | /// A function used to generate names for temporary keys. |
87 | | temp_name_generator_fn: fn() -> String, |
88 | | |
89 | | /// A common prefix to append to all keys before they are sent to Redis. |
90 | | /// |
91 | | /// See [`RedisStore::key_prefix`](`nativelink_config::stores::RedisStore::key_prefix`). |
92 | | #[metric(help = "Prefix to append to all keys before sending to Redis")] |
93 | | key_prefix: String, |
94 | | |
95 | | /// Redis script used to update a value in redis if the version matches. |
96 | | /// This is done by incrementing the version number and then setting the new data |
97 | | /// only if the version number matches the existing version number. |
98 | | update_if_version_matches_script: Script, |
99 | | |
100 | | /// A manager for subscriptions to keys in Redis. |
101 | | subscription_manager: Mutex<Option<Arc<RedisSubscriptionManager>>>, |
102 | | } |
103 | | |
104 | | impl RedisStore { |
105 | | /// Create a new `RedisStore` from the given configuration. |
106 | 0 | pub fn new(config: &nativelink_config::stores::RedisStore) -> Result<Arc<Self>, Error> { |
107 | 0 | if config.addresses.is_empty() { Branch (107:12): [True: 0, False: 0]
Branch (107:12): [Folded - Ignored]
|
108 | 0 | return Err(make_err!( |
109 | 0 | Code::InvalidArgument, |
110 | 0 | "No addresses were specified in redis store configuration." |
111 | 0 | )); |
112 | 0 | }; |
113 | 0 | let [addr] = config.addresses.as_slice() else { Branch (113:13): [True: 0, False: 0]
Branch (113:13): [Folded - Ignored]
|
114 | 0 | return Err(make_err!(Code::Unimplemented, "Connecting directly to multiple redis nodes in a cluster is currently unsupported. Please specify a single URL to a single node, and nativelink will use cluster discover to find the other nodes.")); |
115 | | }; |
116 | 0 | let redis_config = match config.mode { |
117 | 0 | RedisMode::Cluster => RedisConfig::from_url_clustered(addr), |
118 | 0 | RedisMode::Sentinel => RedisConfig::from_url_sentinel(addr), |
119 | 0 | RedisMode::Standard => RedisConfig::from_url_centralized(addr), |
120 | | } |
121 | 0 | .err_tip_with_code(|e| { |
122 | 0 | ( |
123 | 0 | Code::InvalidArgument, |
124 | 0 | format!("while parsing redis node address: {e}"), |
125 | 0 | ) |
126 | 0 | })?; |
127 | | |
128 | 0 | let mut builder = Builder::from_config(redis_config); |
129 | 0 | builder |
130 | 0 | .set_performance_config(PerformanceConfig { |
131 | 0 | default_command_timeout: Duration::from_secs(config.response_timeout_s), |
132 | 0 | ..Default::default() |
133 | 0 | }) |
134 | 0 | .set_connection_config(ConnectionConfig { |
135 | 0 | connection_timeout: Duration::from_secs(config.connection_timeout_s), |
136 | 0 | internal_command_timeout: Duration::from_secs(config.response_timeout_s), |
137 | 0 | ..Default::default() |
138 | 0 | }) |
139 | 0 | // TODO(caass): Make this configurable. |
140 | 0 | .set_policy(ReconnectPolicy::new_constant(1, 0)); |
141 | 0 |
|
142 | 0 | Self::new_from_builder_and_parts( |
143 | 0 | builder, |
144 | 0 | config.experimental_pub_sub_channel.clone(), |
145 | 0 | || Uuid::new_v4().to_string(), |
146 | 0 | config.key_prefix.clone(), |
147 | 0 | ) |
148 | 0 | .map(Arc::new) |
149 | 0 | } |
150 | | |
151 | | /// Used for testing when determinism is required. |
152 | 10 | pub fn new_from_builder_and_parts( |
153 | 10 | mut builder: Builder, |
154 | 10 | pub_sub_channel: Option<String>, |
155 | 10 | temp_name_generator_fn: fn() -> String, |
156 | 10 | key_prefix: String, |
157 | 10 | ) -> Result<Self, Error> { |
158 | 10 | let client_pool = builder |
159 | 10 | .set_performance_config(PerformanceConfig { |
160 | 10 | broadcast_channel_capacity: 4096, |
161 | 10 | ..Default::default() |
162 | 10 | }) |
163 | 10 | .build_pool(CONNECTION_POOL_SIZE) |
164 | 10 | .err_tip(|| "while creating redis connection pool"0 )?0 ; |
165 | | |
166 | 10 | let subscriber_client = builder |
167 | 10 | .build_subscriber_client() |
168 | 10 | .err_tip(|| "while creating redis subscriber client"0 )?0 ; |
169 | | // Fires off a background task using `tokio::spawn`. |
170 | 10 | client_pool.connect(); |
171 | 10 | subscriber_client.connect(); |
172 | 10 | |
173 | 10 | Ok(Self { |
174 | 10 | client_pool, |
175 | 10 | pub_sub_channel, |
176 | 10 | subscriber_client, |
177 | 10 | fingerprint_create_index: fingerprint_create_index_template(), |
178 | 10 | temp_name_generator_fn, |
179 | 10 | key_prefix, |
180 | 10 | update_if_version_matches_script: Script::from_lua(LUA_VERSION_SET_SCRIPT), |
181 | 10 | subscription_manager: Mutex::new(None), |
182 | 10 | }) |
183 | 10 | } |
184 | | |
185 | | /// Encode a [`StoreKey`] so it can be sent to Redis. |
186 | 21 | fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> { |
187 | 21 | let key_body = key.as_str(); |
188 | 21 | if self.key_prefix.is_empty() { Branch (188:12): [True: 17, False: 4]
Branch (188:12): [Folded - Ignored]
|
189 | 17 | key_body |
190 | | } else { |
191 | | // This is in the hot path for all redis operations, so we try to reuse the allocation |
192 | | // from `key.as_str()` if possible. |
193 | 4 | match key_body { |
194 | 4 | Cow::Owned(mut encoded_key) => { |
195 | 4 | encoded_key.insert_str(0, &self.key_prefix); |
196 | 4 | Cow::Owned(encoded_key) |
197 | | } |
198 | 0 | Cow::Borrowed(body) => { |
199 | 0 | let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len()); |
200 | 0 | encoded_key.push_str(&self.key_prefix); |
201 | 0 | encoded_key.push_str(body); |
202 | 0 | Cow::Owned(encoded_key) |
203 | | } |
204 | | } |
205 | | } |
206 | 21 | } |
207 | | |
208 | 0 | pub fn get_client(&self) -> RedisClient { |
209 | 0 | self.client_pool.next().clone() |
210 | 0 | } |
211 | | } |
212 | | |
213 | | #[async_trait] |
214 | | impl StoreDriver for RedisStore { |
215 | | async fn has_with_results( |
216 | | self: Pin<&Self>, |
217 | | keys: &[StoreKey<'_>], |
218 | | results: &mut [Option<u64>], |
219 | 6 | ) -> Result<(), Error> { |
220 | | // TODO(caass): Optimize for the case where `keys.len() == 1` |
221 | 6 | let pipeline = self.client_pool.next().pipeline(); |
222 | 6 | |
223 | 6 | results.iter_mut().for_each(|result| *result = None); |
224 | | |
225 | 6 | for (idx, key) in keys.iter().enumerate() { |
226 | | // Don't bother with zero-length digests. |
227 | 6 | if is_zero_digest(key.borrow()) { Branch (227:16): [True: 2, False: 4]
Branch (227:16): [Folded - Ignored]
|
228 | 2 | results[idx] = Some(0); |
229 | 2 | continue; |
230 | 4 | } |
231 | 4 | |
232 | 4 | let encoded_key = self.encode_key(key); |
233 | 4 | |
234 | 4 | // This command is queued in memory, but not yet sent down the pipeline; the `await` returns instantly. |
235 | 4 | pipeline |
236 | 4 | .strlen::<(), _>(encoded_key.as_ref()) |
237 | 0 | .await |
238 | 4 | .err_tip(|| "In RedisStore::has_with_results"0 )?0 ; |
239 | | } |
240 | | |
241 | | // Send the queued commands. |
242 | 6 | let mut responses = pipeline.all::<Vec<_>>().await4 ?0 .into_iter(); |
243 | 6 | let mut remaining_results = results.iter_mut().filter(|option| { |
244 | 6 | // Anything that's `Some` was already set from `is_zero_digest`. |
245 | 6 | option.is_none() |
246 | 6 | }); |
247 | | |
248 | | // Similar to `Iterator::zip`, but with some verification at the end that the lengths were equal. |
249 | 10 | while let (Some(response), Some(result_slot4 )) = (responses.next(), remaining_results.next()) Branch (249:19): [True: 4, False: 6]
Branch (249:19): [Folded - Ignored]
|
250 | | { |
251 | 4 | if response == 0 { Branch (251:16): [True: 0, False: 4]
Branch (251:16): [Folded - Ignored]
|
252 | | // Redis returns 0 when the key doesn't exist AND when the key exists with value of length 0. |
253 | | // Since we already checked zero-lengths with `is_zero_digest`, this means the value doesn't exist. |
254 | 0 | continue; |
255 | 4 | } |
256 | 4 | |
257 | 4 | *result_slot = Some(response); |
258 | | } |
259 | | |
260 | 6 | if responses.next().is_some() { Branch (260:12): [True: 0, False: 6]
Branch (260:12): [Folded - Ignored]
|
261 | 0 | Err(make_err!( |
262 | 0 | Code::Internal, |
263 | 0 | "Received more responses than expected in RedisStore::has_with_results" |
264 | 0 | )) |
265 | 6 | } else if remaining_results.next().is_some() { Branch (265:19): [True: 0, False: 6]
Branch (265:19): [Folded - Ignored]
|
266 | 0 | Err(make_err!( |
267 | 0 | Code::Internal, |
268 | 0 | "Received fewer responses than expected in RedisStore::has_with_results" |
269 | 0 | )) |
270 | | } else { |
271 | 6 | Ok(()) |
272 | | } |
273 | 12 | } |
274 | | |
275 | | async fn update( |
276 | | self: Pin<&Self>, |
277 | | key: StoreKey<'_>, |
278 | | mut reader: DropCloserReadHalf, |
279 | | _upload_size: UploadSizeInfo, |
280 | 7 | ) -> Result<(), Error> { |
281 | 7 | let final_key = self.encode_key(&key); |
282 | 7 | |
283 | 7 | // While the name generation function can be supplied by the user, we need to have the curly |
284 | 7 | // braces in place in order to manage redis' hashing behavior and make sure that the temporary |
285 | 7 | // key name and the final key name are directed to the same cluster node. See |
286 | 7 | // https://redis.io/blog/redis-clustering-best-practices-with-keys/ |
287 | 7 | // |
288 | 7 | // The TL;DR is that if we're in cluster mode and the names hash differently, we can't use request |
289 | 7 | // pipelining. By using these braces, we tell redis to only hash the part of the temporary key that's |
290 | 7 | // identical to the final key -- so they will always hash to the same node. |
291 | 7 | let temp_key = format!( |
292 | 7 | "temp-{}-{{{}}}", |
293 | 7 | (self.temp_name_generator_fn)(), |
294 | 7 | &final_key |
295 | 7 | ); |
296 | 7 | |
297 | 7 | let client = self.client_pool.next(); |
298 | 7 | |
299 | 7 | // This loop is a little confusing at first glance, but essentially the process is: |
300 | 7 | // - Get as much data from the reader as possible |
301 | 7 | // - When the reader is empty, but the writer isn't done sending data, write that data to redis |
302 | 7 | // - When the writer is done sending data, write the data and break from the loop |
303 | 7 | // |
304 | 7 | // At one extreme, we could append data in redis every time we read some bytes -- that is, make one TCP request |
305 | 7 | // per channel read. This is wasteful since we anticipate reading many small chunks of bytes from the reader. |
306 | 7 | // |
307 | 7 | // At the other extreme, we could make a single TCP request to write all of the data all at once. |
308 | 7 | // This could also be an issue if we read loads of data, since we'd send one massive TCP request |
309 | 7 | // rather than a few moderately-sized requests. |
310 | 7 | // |
311 | 7 | // To compromise, we buffer opportunistically -- when the reader doesn't have any data ready to read, but it's |
312 | 7 | // not done getting data, we flush the data we _have_ read to redis before waiting for the reader to get more. |
313 | 7 | // |
314 | 7 | // As a result of this, there will be a span of time where a key in Redis has only partial data. We want other |
315 | 7 | // observers to notice atomic updates to keys, rather than partial updates, so we first write to a temporary key |
316 | 7 | // and then rename that key once we're done appending data. |
317 | 7 | let mut is_first_chunk = true; |
318 | 7 | let mut eof_reached = false; |
319 | 7 | let mut pipe = client.pipeline(); |
320 | | |
321 | 16 | while !eof_reached { Branch (321:15): [True: 12, False: 4]
Branch (321:15): [Folded - Ignored]
|
322 | 12 | pipe = client.pipeline(); |
323 | 12 | let mut pipe_size = 0; |
324 | | const MAX_PIPE_SIZE: usize = 5 * 1024 * 1024; // 5 MB |
325 | | |
326 | 12 | let chunk11 = reader |
327 | 12 | .recv() |
328 | 3 | .await |
329 | 12 | .err_tip(|| "Failed to reach chunk in update in redis store"1 )?1 ; |
330 | | |
331 | 11 | if chunk.is_empty() { Branch (331:16): [True: 6, False: 5]
Branch (331:16): [Folded - Ignored]
|
332 | | // There are three cases where we receive an empty chunk: |
333 | | // 1. The first chunk of a zero-digest key. We're required to treat all zero-digest keys as if they exist |
334 | | // and are empty per the RBE spec, so we can just return early as if we've pushed it -- any attempts to |
335 | | // read the value later will similarly avoid the network trip. |
336 | | // 2. This is an empty first chunk of a non-zero-digest key. In this case, we _do_ need to push up an |
337 | | // empty key, but can skip the rest of the process around renaming since there's only the one operation. |
338 | | // 3. This is the last chunk (EOF) of a regular key. In that case we can skip pushing this chunk. |
339 | | // |
340 | | // In all three cases, we're done pushing data and can move it from the temporary key to the final key. |
341 | 6 | if is_first_chunk && is_zero_digest(key.borrow())2 { Branch (341:20): [True: 2, False: 4]
Branch (341:38): [True: 2, False: 0]
Branch (341:20): [Folded - Ignored]
Branch (341:38): [Folded - Ignored]
|
342 | | // Case 1, a zero-digest key. |
343 | 2 | return Ok(()); |
344 | 4 | } else if is_first_chunk { Branch (344:27): [True: 0, False: 4]
Branch (344:27): [Folded - Ignored]
|
345 | | // Case 2, an empty non-zero-digest key. |
346 | 0 | pipe.append::<(), _, _>(&temp_key, "") |
347 | 0 | .await |
348 | 0 | .err_tip(|| "While appending to temp key in RedisStore::update")?; |
349 | 4 | }; |
350 | | |
351 | | // Note: setting `eof_reached = true` and calling `continue` is semantically equivalent to `break`. |
352 | | // Since we need to use the `eof_reached` flag in the inner loop, we do the same here |
353 | | // for consistency. |
354 | 4 | eof_reached = true; |
355 | 4 | continue; |
356 | 5 | } else { |
357 | 5 | // Not EOF, but we've now received our first chunk. |
358 | 5 | is_first_chunk = false; |
359 | 5 | } |
360 | 5 | |
361 | 5 | // Queue the append, but don't execute until we've received all the chunks. |
362 | 5 | pipe_size += chunk.len(); |
363 | 5 | pipe.append::<(), _, _>(&temp_key, chunk) |
364 | 0 | .await |
365 | 5 | .err_tip(|| "Failed to append to temp key in RedisStore::update"0 )?0 ; |
366 | | |
367 | | // Opportunistically grab any other chunks already in the reader. |
368 | 5 | while let Some(chunk0 ) = reader Branch (368:23): [True: 0, False: 5]
Branch (368:23): [Folded - Ignored]
|
369 | 5 | .try_recv() |
370 | 5 | .transpose() |
371 | 5 | .err_tip(|| "Failed to reach chunk in update in redis store"0 )?0 |
372 | | { |
373 | 0 | if chunk.is_empty() { Branch (373:20): [True: 0, False: 0]
Branch (373:20): [Folded - Ignored]
|
374 | 0 | eof_reached = true; |
375 | 0 | break; |
376 | | } else { |
377 | 0 | pipe_size += chunk.len(); |
378 | 0 | pipe.append::<(), _, _>(&temp_key, chunk) |
379 | 0 | .await |
380 | 0 | .err_tip(|| "Failed to append to temp key in RedisStore::update")?; |
381 | | } |
382 | | |
383 | | // Stop appending if the pipeline is already holding 5MB of data. |
384 | 0 | if pipe_size >= MAX_PIPE_SIZE { Branch (384:20): [True: 0, False: 0]
Branch (384:20): [Folded - Ignored]
|
385 | 0 | break; |
386 | 0 | } |
387 | | } |
388 | | |
389 | | // We've exhausted the reader (or hit the 5MB cap), but more data is expected. |
390 | | // Executing the queued commands appends the data we just received to the temp key. |
391 | 5 | pipe.all::<()>() |
392 | 5 | .await |
393 | 5 | .err_tip(|| "Failed to append to temporary key in RedisStore::update"0 )?0 ; |
394 | | } |
395 | | |
396 | | // Rename the temp key so that the data appears under the real key. Any data already present in the real key is lost. |
397 | 4 | pipe.rename::<(), _, _>(&temp_key, final_key.as_ref()) |
398 | 0 | .await |
399 | 4 | .err_tip(|| "While queueing key rename in RedisStore::update()"0 )?0 ; |
400 | 4 | pipe.all::<()>() |
401 | 4 | .await |
402 | 4 | .err_tip(|| "While renaming key in RedisStore::update()"0 )?0 ; |
403 | | |
404 | | // If we have a publish channel configured, send a notice that the key has been set. |
405 | 4 | if let Some(pub_sub_channel0 ) = &self.pub_sub_channel { Branch (405:16): [True: 0, False: 4]
Branch (405:16): [Folded - Ignored]
|
406 | 0 | return Ok(client.publish(pub_sub_channel, final_key.as_ref()).await?); |
407 | 4 | }; |
408 | 4 | |
409 | 4 | Ok(()) |
410 | 14 | } |
411 | | |
412 | | async fn get_part( |
413 | | self: Pin<&Self>, |
414 | | key: StoreKey<'_>, |
415 | | writer: &mut DropCloserWriteHalf, |
416 | | offset: u64, |
417 | | length: Option<u64>, |
418 | 5 | ) -> Result<(), Error> { |
419 | 5 | let offset = usize::try_from(offset).err_tip(|| "Could not convert offset to usize"0 )?0 ; |
420 | 5 | let length = length |
421 | 5 | .map(|v| usize::try_from(v).err_tip(4 || "Could not convert length to usize"0 )4 ) |
422 | 5 | .transpose()?0 ; |
423 | | |
424 | | // To follow RBE spec we need to consider any digest's with |
425 | | // zero size to be existing. |
426 | 5 | if is_zero_digest(key.borrow()) { Branch (426:12): [True: 0, False: 5]
Branch (426:12): [Folded - Ignored]
|
427 | 0 | return writer |
428 | 0 | .send_eof() |
429 | 0 | .err_tip(|| "Failed to send zero EOF in redis store get_part"); |
430 | 5 | } |
431 | 5 | |
432 | 5 | let client = self.client_pool.next(); |
433 | 5 | let encoded_key = self.encode_key(&key); |
434 | 5 | let encoded_key = encoded_key.as_ref(); |
435 | 5 | |
436 | 5 | // N.B. the `-1`'s you see here are because redis GETRANGE is inclusive at both the start and end, so when we |
437 | 5 | // do math with indices we change them to be exclusive at the end. |
438 | 5 | |
439 | 5 | // We want to read the data at the key from `offset` to `offset + length`. |
440 | 5 | let data_start = offset; |
441 | 5 | let data_end = data_start |
442 | 5 | .saturating_add(length.unwrap_or(isize::MAX as usize)) |
443 | 5 | .saturating_sub(1); |
444 | 5 | |
445 | 5 | // And we don't ever want to read more than `READ_CHUNK_SIZE` bytes at a time, so we'll need to iterate. |
446 | 5 | let mut chunk_start = data_start; |
447 | 5 | let mut chunk_end = cmp::min(data_start.saturating_add(READ_CHUNK_SIZE) - 1, data_end); |
448 | | |
449 | | loop { |
450 | 6 | let chunk: Bytes = client |
451 | 6 | .getrange(encoded_key, chunk_start, chunk_end) |
452 | 6 | .await |
453 | 6 | .err_tip(|| "In RedisStore::get_part::getrange"0 )?0 ; |
454 | | |
455 | 6 | let didnt_receive_full_chunk = chunk.len() < READ_CHUNK_SIZE; |
456 | 6 | let reached_end_of_data = chunk_end == data_end; |
457 | 6 | |
458 | 6 | if didnt_receive_full_chunk || reached_end_of_data1 { Branch (458:16): [True: 5, False: 1]
Branch (458:44): [True: 0, False: 1]
Branch (458:16): [Folded - Ignored]
Branch (458:44): [Folded - Ignored]
|
459 | 5 | if !chunk.is_empty() { Branch (459:20): [True: 4, False: 1]
Branch (459:20): [Folded - Ignored]
|
460 | 4 | writer |
461 | 4 | .send(chunk) |
462 | 0 | .await |
463 | 4 | .err_tip(|| "Failed to write data in RedisStore::get_part"0 )?0 ; |
464 | 1 | } |
465 | | |
466 | 5 | break; // No more data to read. |
467 | 1 | } |
468 | 1 | |
469 | 1 | // We received a full chunk's worth of data, so write it... |
470 | 1 | writer |
471 | 1 | .send(chunk) |
472 | 0 | .await |
473 | 1 | .err_tip(|| "Failed to write data in RedisStore::get_part"0 )?0 ; |
474 | | |
475 | | // ...and go grab the next chunk. |
476 | 1 | chunk_start = chunk_end + 1; |
477 | 1 | chunk_end = cmp::min(chunk_start.saturating_add(READ_CHUNK_SIZE) - 1, data_end); |
478 | | } |
479 | | |
480 | | // If we didn't write any data, check if the key exists, if not return a NotFound error. |
481 | | // This is required by spec. |
482 | 5 | if writer.get_bytes_written() == 0 { Branch (482:12): [True: 1, False: 4]
Branch (482:12): [Folded - Ignored]
|
483 | | // We're supposed to read 0 bytes, so just check if the key exists. |
484 | 1 | let exists = client |
485 | 1 | .exists::<bool, _>(encoded_key) |
486 | 1 | .await |
487 | 1 | .err_tip(|| "In RedisStore::get_part::zero_exists"0 )?0 ; |
488 | | |
489 | 1 | if !exists { Branch (489:16): [True: 1, False: 0]
Branch (489:16): [Folded - Ignored]
|
490 | 1 | return Err(make_err!( |
491 | 1 | Code::NotFound, |
492 | 1 | "Data not found in Redis store for digest: {key:?}" |
493 | 1 | )); |
494 | 0 | } |
495 | 4 | } |
496 | | |
497 | 4 | writer |
498 | 4 | .send_eof() |
499 | 4 | .err_tip(|| "Failed to write EOF in redis store get_part"0 ) |
500 | 10 | } |
501 | | |
502 | 0 | fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { |
503 | 0 | self |
504 | 0 | } |
505 | | |
506 | 0 | fn as_any(&self) -> &(dyn std::any::Any + Sync + Send) { |
507 | 0 | self |
508 | 0 | } |
509 | | |
510 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send> { |
511 | 0 | self |
512 | 0 | } |
513 | | |
514 | 0 | fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) { |
515 | 0 | registry.register_indicator(self); |
516 | 0 | } |
517 | | } |
518 | | |
519 | | #[async_trait] |
520 | | impl HealthStatusIndicator for RedisStore { |
521 | 0 | fn get_name(&self) -> &'static str { |
522 | 0 | "RedisStore" |
523 | 0 | } |
524 | | |
525 | 0 | async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { |
526 | 0 | StoreDriver::check_health(Pin::new(self), namespace).await |
527 | 0 | } |
528 | | } |
529 | | |
530 | | /// ------------------------------------------------------------------- |
531 | | /// Below this line are specific to the redis scheduler implementation. |
532 | | /// ------------------------------------------------------------------- |
533 | | |
534 | | /// The maximum number of results to return per cursor. |
535 | | const MAX_COUNT_PER_CURSOR: u64 = 256; |
536 | | /// The time in milliseconds that a redis cursor can be idle before it is closed. |
537 | | const CURSOR_IDLE_MS: u64 = 2_000; |
538 | | /// The name of the field in the Redis hash that stores the data. |
539 | | const DATA_FIELD_NAME: &str = "data"; |
540 | | /// The name of the field in the Redis hash that stores the version. |
541 | | const VERSION_FIELD_NAME: &str = "version"; |
542 | | /// The time to live of indexes in seconds. After this time redis may delete the index. |
543 | | const INDEX_TTL_S: u64 = 60 * 60 * 24; // 24 hours. |
544 | | |
545 | | /// String of the `FT.CREATE` command used to create the index template. It is done this |
546 | | /// way so we can use it in both const (compile time) functions and runtime functions. |
547 | | /// This is a macro because we need to use it in multiple places that sometimes require the |
548 | | /// data as different data types (specifically for rust's format_args! macro). |
549 | | macro_rules! get_create_index_template { |
550 | | () => { |
551 | | "FT.CREATE {} ON HASH PREFIX 1 {} NOOFFSETS NOHL NOFIELDS NOFREQS SCHEMA {} TAG CASESENSITIVE SORTABLE" |
552 | | } |
553 | | } |
554 | | |
555 | | /// Lua script to set a key if the version matches. |
556 | | /// Args: |
557 | | /// KEYS[1]: The key where the version is stored. |
558 | | /// ARGV[1]: The expected version. |
559 | | /// ARGV[2]: The new data. |
560 | | /// ARGV[3*]: Key-value pairs of additional data to include. |
561 | | /// Returns: |
562 | | /// The new version if the version matches. nil is returned if the |
563 | | /// value was not set. |
564 | | const LUA_VERSION_SET_SCRIPT: &str = formatcp!( |
565 | | r#" |
566 | | local key = KEYS[1] |
567 | | local expected_version = tonumber(ARGV[1]) |
568 | | local new_data = ARGV[2] |
569 | | local new_version = redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', 1) |
570 | | local i |
571 | | local indexes = {{}} |
572 | | |
573 | | if new_version-1 ~= expected_version then |
574 | | redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', -1) |
575 | | return 0 |
576 | | end |
577 | | -- Skip first 2 argvs, as they are known inputs. |
578 | | -- Remember: Lua is 1-indexed. |
579 | | for i=3, #ARGV do |
580 | | indexes[i-2] = ARGV[i] |
581 | | end |
582 | | |
583 | | -- In testing we witnessed redis sometimes not update our FT indexes |
584 | | -- resulting in stale data. It appears if we delete our keys then insert |
585 | | -- them again it works and reduces risk significantly. |
586 | | redis.call('DEL', key) |
587 | | redis.call('HSET', key, '{DATA_FIELD_NAME}', new_data, '{VERSION_FIELD_NAME}', new_version, unpack(indexes)) |
588 | | |
589 | | return new_version |
590 | | "# |
591 | | ); |
592 | | |
593 | | /// Compile-time fingerprint of the `FT.CREATE` command used to create the index template. |
594 | | /// This is a simple CRC32 checksum of the command string. We don't care about it actually |
595 | | /// being a valid CRC32 checksum, just that it's a unique identifier with a low chance of |
596 | | /// collision. |
597 | 13 | const fn fingerprint_create_index_template() -> u32 { |
598 | | const POLY: u32 = 0xEDB8_8320; |
599 | | const DATA: &[u8] = get_create_index_template!().as_bytes(); |
600 | 13 | let mut crc = 0xFFFF_FFFF; |
601 | 13 | let mut i = 0; |
602 | 1.32k | while i < DATA.len() { Branch (602:11): [True: 1.31k, False: 13]
Branch (602:11): [Folded - Ignored]
|
603 | 1.31k | let byte = DATA[i]; |
604 | 1.31k | crc ^= byte as u32; |
605 | 1.31k | |
606 | 1.31k | let mut j = 0; |
607 | 11.8k | while j < 8 { Branch (607:15): [True: 10.5k, False: 1.31k]
Branch (607:15): [Folded - Ignored]
|
608 | 10.5k | crc = if crc & 1 != 0 { Branch (608:22): [True: 5.01k, False: 5.48k]
Branch (608:22): [Folded - Ignored]
|
609 | 5.01k | (crc >> 1) ^ POLY |
610 | | } else { |
611 | 5.48k | crc >> 1 |
612 | | }; |
613 | 10.5k | j += 1; |
614 | | } |
615 | 1.31k | i += 1; |
616 | | } |
617 | 13 | crc |
618 | 13 | } |
619 | | |
620 | | /// Get the name of the index to create for the given field. |
621 | | /// This will add some prefix data to the name to try and ensure |
622 | | /// if the index definition changes, the name will get a new name. |
623 | | macro_rules! get_index_name { |
624 | | ($prefix:expr, $field:expr) => { |
625 | | format_args!( |
626 | | "{}_{}_{:08x}", |
627 | | $prefix, |
628 | | $field, |
629 | | fingerprint_create_index_template(), |
630 | | ) |
631 | | }; |
632 | | } |
633 | | |
634 | | /// Try to sanitize a string to be used as a Redis key. |
635 | | /// We don't actually modify the string, just check if it's valid. |
636 | 2 | const fn try_sanitize(s: &str) -> Option<&str> { |
637 | 2 | // Note: We cannot use for loops or iterators here because they are not const. |
638 | 2 | // Allowing us to use a const function here gives the compiler the ability to |
639 | 2 | // optimize this function away entirely in the case where the input is constant. |
640 | 2 | let chars = s.as_bytes(); |
641 | 2 | let mut i: usize = 0; |
642 | 2 | let len = s.len(); |
643 | | loop { |
644 | 180 | if i >= len { Branch (644:12): [True: 2, False: 178]
Branch (644:12): [Folded - Ignored]
|
645 | 2 | break; |
646 | 178 | } |
647 | 178 | let c = chars[i]; |
648 | 178 | if !c.is_ascii_alphanumeric() && c != b'_'10 { Branch (648:12): [True: 10, False: 168]
Branch (648:42): [True: 0, False: 10]
Branch (648:12): [Folded - Ignored]
Branch (648:42): [Folded - Ignored]
|
649 | 0 | return None; |
650 | 178 | } |
651 | 178 | i += 1; |
652 | | } |
653 | 2 | Some(s) |
654 | 2 | } |
655 | | |
656 | | /// An individual subscription to a key in Redis. |
657 | | pub struct RedisSubscription { |
658 | | receiver: Option<tokio::sync::watch::Receiver<String>>, |
659 | | weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>, |
660 | | } |
661 | | |
662 | | impl SchedulerSubscription for RedisSubscription { |
663 | | /// Wait for the subscription key to change. |
664 | 7 | async fn changed(&mut self) -> Result<(), Error> { |
665 | 7 | let receiver = self |
666 | 7 | .receiver |
667 | 7 | .as_mut() |
668 | 7 | .ok_or_else(|| make_err!(Code::Internal, "In RedisSubscription::changed::as_mut")0 )?0 ; |
669 | 7 | receiver |
670 | 7 | .changed() |
671 | 5 | .await |
672 | 6 | .map_err(|_| make_err!(Code::Internal, "In RedisSubscription::changed::changed")0 ) |
673 | 6 | } |
674 | | } |
675 | | |
676 | | // If the subscription is dropped, we need to possibly remove the key from the |
677 | | // subscribed keys map. |
678 | | impl Drop for RedisSubscription { |
679 | 2 | fn drop(&mut self) { |
680 | 2 | let Some(receiver) = self.receiver.take() else { Branch (680:13): [True: 2, False: 0]
Branch (680:13): [Folded - Ignored]
|
681 | 0 | event!( |
682 | 0 | Level::WARN, |
683 | 0 | "RedisSubscription has already been dropped, nothing to do." |
684 | | ); |
685 | 0 | return; // Already dropped, nothing to do. |
686 | | }; |
687 | 2 | let key = receiver.borrow().clone(); |
688 | 2 | // IMPORTANT: This must be dropped before receiver_count() is called. |
689 | 2 | drop(receiver); |
690 | 2 | let Some(subscribed_keys) = self.weak_subscribed_keys.upgrade() else { Branch (690:13): [True: 2, False: 0]
Branch (690:13): [Folded - Ignored]
|
691 | 0 | return; // Already dropped, nothing to do. |
692 | | }; |
693 | 2 | let mut subscribed_keys = subscribed_keys.write(); |
694 | 2 | let Some(value) = subscribed_keys.get(&key) else { Branch (694:13): [True: 2, False: 0]
Branch (694:13): [Folded - Ignored]
|
695 | 0 | event!( |
696 | 0 | Level::ERROR, |
697 | 0 | "Key {key} was not found in subscribed keys when checking if it should be removed." |
698 | | ); |
699 | 0 | return; |
700 | | }; |
701 | | // If we have no receivers, cleanup the entry from our map. |
702 | 2 | if value.receiver_count() == 0 { Branch (702:12): [True: 2, False: 0]
Branch (702:12): [Folded - Ignored]
|
703 | 2 | subscribed_keys.remove(key); |
704 | 2 | }0 |
705 | 2 | } |
706 | | } |
707 | | |
708 | | /// A publisher for a key in Redis. |
709 | | struct RedisSubscriptionPublisher { |
710 | | sender: Mutex<tokio::sync::watch::Sender<String>>, |
711 | | } |
712 | | |
713 | | impl RedisSubscriptionPublisher { |
714 | 2 | fn new( |
715 | 2 | key: String, |
716 | 2 | weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>, |
717 | 2 | ) -> (Self, RedisSubscription) { |
718 | 2 | let (sender, receiver) = tokio::sync::watch::channel(key); |
719 | 2 | let publisher = Self { |
720 | 2 | sender: Mutex::new(sender), |
721 | 2 | }; |
722 | 2 | let subscription = RedisSubscription { |
723 | 2 | receiver: Some(receiver), |
724 | 2 | weak_subscribed_keys, |
725 | 2 | }; |
726 | 2 | (publisher, subscription) |
727 | 2 | } |
728 | | |
729 | 0 | fn subscribe( |
730 | 0 | &self, |
731 | 0 | weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>, |
732 | 0 | ) -> RedisSubscription { |
733 | 0 | let receiver = self.sender.lock().subscribe(); |
734 | 0 | RedisSubscription { |
735 | 0 | receiver: Some(receiver), |
736 | 0 | weak_subscribed_keys, |
737 | 0 | } |
738 | 0 | } |
739 | | |
740 | 2 | fn receiver_count(&self) -> usize { |
741 | 2 | self.sender.lock().receiver_count() |
742 | 2 | } |
743 | | |
744 | 4 | fn notify(&self) { |
745 | 4 | // TODO(https://github.com/sile/patricia_tree/issues/40) When this is addressed |
746 | 4 | // we can remove the `Mutex` and use the mutable iterator directly. |
747 | 4 | self.sender.lock().send_modify(|_| {}); |
748 | 4 | } |
749 | | } |
750 | | |
751 | | pub struct RedisSubscriptionManager { |
752 | | subscribed_keys: Arc<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>, |
753 | | tx_for_test: tokio::sync::mpsc::UnboundedSender<String>, |
754 | | _subscription_spawn: JoinHandleDropGuard<()>, |
755 | | } |
756 | | |
757 | | impl RedisSubscriptionManager { |
758 | 1 | pub fn new(subscribe_client: SubscriberClient, pub_sub_channel: String) -> Self { |
759 | 1 | let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new())); |
760 | 1 | let subscribed_keys_weak = Arc::downgrade(&subscribed_keys); |
761 | 1 | let (tx_for_test, mut rx_for_test) = tokio::sync::mpsc::unbounded_channel(); |
762 | 1 | Self { |
763 | 1 | subscribed_keys, |
764 | 1 | tx_for_test, |
765 | 1 | _subscription_spawn: spawn!("redis_subscribe_spawn", async move { |
766 | 1 | let mut rx = subscribe_client.message_rx(); |
767 | | loop { |
768 | 1 | if let Err(e0 ) = subscribe_client.subscribe(&pub_sub_channel).await { Branch (768:28): [True: 0, False: 1]
Branch (768:28): [Folded - Ignored]
|
769 | 0 | event!(Level::ERROR, "Error subscribing to pattern - {e}"); |
770 | 0 | return; |
771 | 1 | } |
772 | 1 | let mut reconnect_rx = subscribe_client.reconnect_rx(); |
773 | 1 | let reconnect_fut = reconnect_rx.recv().fuse(); |
774 | 1 | tokio::pin!(reconnect_fut); |
775 | | loop { |
776 | 4 | let key3 = select! { |
777 | 4 | value3 = rx_for_test.recv() => { |
778 | 3 | let Some(value) = value else { Branch (778:37): [True: 3, False: 0]
Branch (778:37): [Folded - Ignored]
|
779 | 0 | unreachable!("Channel should never close"); |
780 | | }; |
781 | 3 | value.into() |
782 | | }, |
783 | 4 | msg0 = rx.recv() => { |
784 | 0 | match msg { |
785 | 0 | Ok(msg) => { |
786 | 0 | if let RedisValue::String(s) = msg.value { Branch (786:48): [True: 0, False: 0]
Branch (786:48): [Folded - Ignored]
|
787 | 0 | s |
788 | | } else { |
789 | 0 | event!(Level::ERROR, "Received non-string message in RedisSubscriptionManager"); |
790 | 0 | continue; |
791 | | } |
792 | | }, |
793 | 0 | Err(e) => { |
794 | 0 | // Check to see if our parent has been dropped and if so kill spawn. |
795 | 0 | if subscribed_keys_weak.upgrade().is_none() { Branch (795:44): [True: 0, False: 0]
Branch (795:44): [Folded - Ignored]
|
796 | 0 | event!(Level::WARN, "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"); |
797 | 0 | return; |
798 | 0 | }; |
799 | 0 | event!(Level::ERROR, "Error receiving message in RedisSubscriptionManager reconnecting and flagging everything changed - {e}"); |
800 | 0 | break; |
801 | | } |
802 | | } |
803 | | }, |
804 | 4 | _ = &mut reconnect_fut => { |
805 | 0 | event!(Level::WARN, "Redis reconnected flagging all subscriptions as changed and resuming"); |
806 | 0 | break; |
807 | | } |
808 | | }; |
809 | 3 | let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else { Branch (809:29): [True: 3, False: 0]
Branch (809:29): [Folded - Ignored]
|
810 | 0 | event!(Level::WARN, "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"); |
811 | 0 | return; |
812 | | }; |
813 | 3 | let subscribed_keys_mux = subscribed_keys.read(); |
814 | 3 | subscribed_keys_mux |
815 | 3 | .common_prefix_values(&*key) |
816 | 3 | .for_each(RedisSubscriptionPublisher::notify); |
817 | | } |
818 | | // Sleep for a small amount of time to ensure we don't reconnect too quickly. |
819 | 0 | sleep(Duration::from_secs(1)).await; |
820 | | // If we reconnect or lag behind we might have had dirty keys, so we need to |
821 | | // flag all of them as changed. |
822 | 0 | let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else { Branch (822:25): [True: 0, False: 0]
Branch (822:25): [Folded - Ignored]
|
823 | 0 | event!(Level::WARN, "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"); |
824 | 0 | return; |
825 | | }; |
826 | 0 | let subscribed_keys_mux = subscribed_keys.read(); |
827 | 0 | // Just in case also get a new receiver. |
828 | 0 | rx = subscribe_client.message_rx(); |
829 | 0 | // Drop all buffered messages, then flag everything as changed. |
830 | 0 | rx.resubscribe(); |
831 | 0 | for publisher in subscribed_keys_mux.values() { |
832 | 0 | publisher.notify(); |
833 | 0 | } |
834 | | } |
835 | 1 | }0 ), |
836 | | } |
837 | 1 | } |
838 | | } |
839 | | |
840 | | impl SchedulerSubscriptionManager for RedisSubscriptionManager { |
841 | | type Subscription = RedisSubscription; |
842 | | |
843 | 3 | fn notify_for_test(&self, value: String) { |
844 | 3 | self.tx_for_test.send(value).unwrap(); |
845 | 3 | } |
846 | | |
847 | 2 | fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error> |
848 | 2 | where |
849 | 2 | K: SchedulerStoreKeyProvider, |
850 | 2 | { |
851 | 2 | let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys); |
852 | 2 | let mut subscribed_keys = self.subscribed_keys.write(); |
853 | 2 | let key = key.get_key(); |
854 | 2 | let key_str = key.as_str(); |
855 | 2 | let mut subscription = if let Some(publisher0 ) = subscribed_keys.get(&key_str) { Branch (855:39): [True: 0, False: 0]
Branch (855:39): [Folded - Ignored]
Branch (855:39): [Folded - Ignored]
Branch (855:39): [True: 0, False: 2]
|
856 | 0 | publisher.subscribe(weak_subscribed_keys) |
857 | | } else { |
858 | 2 | let (publisher, subscription) = |
859 | 2 | RedisSubscriptionPublisher::new(key_str.to_string(), weak_subscribed_keys); |
860 | 2 | subscribed_keys.insert(key_str, publisher); |
861 | 2 | subscription |
862 | | }; |
863 | 2 | subscription |
864 | 2 | .receiver |
865 | 2 | .as_mut() |
866 | 2 | .ok_or_else(|| { |
867 | 0 | make_err!( |
868 | 0 | Code::Internal, |
869 | 0 | "Receiver should be set in RedisSubscriptionManager::subscribe" |
870 | 0 | ) |
871 | 2 | })?0 |
872 | 2 | .mark_changed(); |
873 | 2 | |
874 | 2 | Ok(subscription) |
875 | 2 | } |
876 | | } |
877 | | |
878 | | impl SchedulerStore for RedisStore { |
879 | | type SubscriptionManager = RedisSubscriptionManager; |
880 | | |
881 | 3 | fn subscription_manager(&self) -> Result<Arc<RedisSubscriptionManager>, Error> { |
882 | 3 | let mut subscription_manager = self.subscription_manager.lock(); |
883 | 3 | match &*subscription_manager { |
884 | 2 | Some(subscription_manager) => Ok(subscription_manager.clone()), |
885 | | None => { |
886 | 1 | let Some(pub_sub_channel) = &self.pub_sub_channel else { Branch (886:21): [True: 1, False: 0]
Branch (886:21): [Folded - Ignored]
|
887 | 0 | return Err(make_input_err!("RedisStore must have a pubsub channel for a Redis Scheduler if using subscriptions")); |
888 | | }; |
889 | 1 | let sub = Arc::new(RedisSubscriptionManager::new( |
890 | 1 | self.subscriber_client.clone(), |
891 | 1 | pub_sub_channel.clone(), |
892 | 1 | )); |
893 | 1 | *subscription_manager = Some(sub.clone()); |
894 | 1 | Ok(sub) |
895 | | } |
896 | | } |
897 | 3 | } |
898 | | |
899 | 3 | async fn update_data<T>(&self, data: T) -> Result<Option<u64>, Error> |
900 | 3 | where |
901 | 3 | T: SchedulerStoreDataProvider |
902 | 3 | + SchedulerStoreKeyProvider |
903 | 3 | + SchedulerCurrentVersionProvider |
904 | 3 | + Send, |
905 | 3 | { |
906 | 3 | let key = data.get_key(); |
907 | 3 | let key = self.encode_key(&key); |
908 | 3 | let client = self.client_pool.next(); |
909 | 3 | let maybe_index = data.get_indexes().err_tip(|| { |
910 | 0 | format!("Err getting index in RedisStore::update_data::versioned for {key:?}") |
911 | 3 | })?0 ; |
912 | 3 | if <T as SchedulerStoreKeyProvider>::Versioned::VALUE { Branch (912:12): [True: 0, False: 0]
Branch (912:12): [True: 0, False: 0]
Branch (912:12): [Folded - Ignored]
Branch (912:12): [Folded - Ignored]
Branch (912:12): [True: 0, False: 1]
Branch (912:12): [True: 2, False: 0]
|
913 | 2 | let current_version = data.current_version(); |
914 | 2 | let data = data.try_into_bytes().err_tip(|| { |
915 | 0 | format!("Could not convert value to bytes in RedisStore::update_data::versioned for {key:?}") |
916 | 2 | })?0 ; |
917 | 2 | let mut argv = Vec::with_capacity(3 + maybe_index.len() * 2); |
918 | 2 | argv.push(Bytes::from(format!("{current_version}"))); |
919 | 2 | argv.push(data); |
920 | 6 | for (name, value4 ) in maybe_index { |
921 | 4 | argv.push(Bytes::from_static(name.as_bytes())); |
922 | 4 | argv.push(value); |
923 | 4 | } |
924 | 2 | let new_version = self |
925 | 2 | .update_if_version_matches_script |
926 | 2 | .evalsha_with_reload::<u64, _, Vec<Bytes>>(client, vec![key.as_ref()], argv) |
927 | 2 | .await |
928 | 2 | .err_tip(|| format!("In RedisStore::update_data::versioned for {key:?}")0 )?0 ; |
929 | 2 | if new_version == 0 { Branch (929:16): [True: 0, False: 0]
Branch (929:16): [True: 0, False: 0]
Branch (929:16): [Folded - Ignored]
Branch (929:16): [Folded - Ignored]
Branch (929:16): [True: 0, False: 0]
Branch (929:16): [True: 0, False: 2]
|
930 | 0 | return Ok(None); |
931 | 2 | } |
932 | | // If we have a publish channel configured, send a notice that the key has been set. |
933 | 2 | if let Some(pub_sub_channel) = &self.pub_sub_channel { Branch (933:20): [True: 0, False: 0]
Branch (933:20): [True: 0, False: 0]
Branch (933:20): [Folded - Ignored]
Branch (933:20): [Folded - Ignored]
Branch (933:20): [True: 0, False: 0]
Branch (933:20): [True: 2, False: 0]
|
934 | 2 | return Ok(client.publish(pub_sub_channel, key.as_ref()).await?0 ); |
935 | 0 | }; |
936 | 0 | Ok(Some(new_version)) |
937 | | } else { |
938 | 1 | let data = data.try_into_bytes().err_tip(|| { |
939 | 0 | format!("Could not convert value to bytes in RedisStore::update_data::noversion for {key:?}") |
940 | 1 | })?0 ; |
941 | 1 | let mut fields = RedisMap::new(); |
942 | 1 | fields.reserve(1 + maybe_index.len()); |
943 | 1 | fields.insert(DATA_FIELD_NAME.into(), data.into()); |
944 | 1 | for (name, value0 ) in maybe_index { |
945 | 0 | fields.insert(name.into(), value.into()); |
946 | 0 | } |
947 | 1 | client |
948 | 1 | .hset::<(), _, _>(key.as_ref(), fields) |
949 | 1 | .await |
950 | 1 | .err_tip(|| format!("In RedisStore::update_data::noversion for {key:?}")0 )?0 ; |
951 | | // If we have a publish channel configured, send a notice that the key has been set. |
952 | 1 | if let Some(pub_sub_channel) = &self.pub_sub_channel { Branch (952:20): [True: 0, False: 0]
Branch (952:20): [True: 0, False: 0]
Branch (952:20): [Folded - Ignored]
Branch (952:20): [Folded - Ignored]
Branch (952:20): [True: 1, False: 0]
Branch (952:20): [True: 0, False: 0]
|
953 | 1 | return Ok(client.publish(pub_sub_channel, key.as_ref()).await?0 ); |
954 | 0 | }; |
955 | 0 | Ok(Some(0)) // Always use "0" version since this is not a versioned request. |
956 | | } |
957 | 3 | } |
958 | | |
959 | 1 | async fn search_by_index_prefix<K>( |
960 | 1 | &self, |
961 | 1 | index: K, |
962 | 1 | ) -> Result< |
963 | 1 | impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send, |
964 | 1 | Error, |
965 | 1 | > |
966 | 1 | where |
967 | 1 | K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send, |
968 | 1 | { |
969 | 1 | let index_value_prefix = index.index_value_prefix(); |
970 | 2 | let run_ft_aggregate = || { |
971 | 2 | let client = self.client_pool.next().clone(); |
972 | 2 | let sanitized_field = try_sanitize(index_value_prefix.as_ref()).err_tip(|| { |
973 | 0 | format!( |
974 | 0 | "In RedisStore::search_by_index_prefix::try_sanitize - {index_value_prefix:?}" |
975 | 0 | ) |
976 | 2 | })?0 ; |
977 | 2 | Ok::<_, Error>(async move { |
978 | 2 | ft_aggregate( |
979 | 2 | client, |
980 | 2 | format!("{}", get_index_name!(K::KEY_PREFIX, K::INDEX_NAME)), |
981 | 2 | format!("@{}:{{ {}* }}", K::INDEX_NAME, sanitized_field), |
982 | 2 | fred::types::FtAggregateOptions { |
983 | 2 | load: Some(fred::types::Load::Some(vec![ |
984 | 2 | fred::types::SearchField { |
985 | 2 | identifier: DATA_FIELD_NAME.into(), |
986 | 2 | property: None, |
987 | 2 | }, |
988 | 2 | fred::types::SearchField { |
989 | 2 | identifier: VERSION_FIELD_NAME.into(), |
990 | 2 | property: None, |
991 | 2 | }, |
992 | 2 | ])), |
993 | 2 | cursor: Some(fred::types::WithCursor { |
994 | 2 | count: Some(MAX_COUNT_PER_CURSOR), |
995 | 2 | max_idle: Some(CURSOR_IDLE_MS), |
996 | 2 | }), |
997 | 2 | pipeline: vec![fred::types::AggregateOperation::SortBy { |
998 | 2 | properties: vec![( |
999 | 2 | format!("@{}", K::INDEX_NAME).into(), |
1000 | 2 | fred::types::SortOrder::Asc, |
1001 | 2 | )], |
1002 | 2 | max: None, |
1003 | 2 | }], |
1004 | 2 | ..Default::default() |
1005 | 2 | }, |
1006 | 2 | ) |
1007 | 2 | .await |
1008 | 2 | }) |
1009 | 2 | }; |
1010 | 1 | let stream = if let Ok(stream0 ) = run_ft_aggregate()?0 .await { Branch (1010:29): [True: 0, False: 0]
Branch (1010:29): [True: 0, False: 0]
Branch (1010:29): [Folded - Ignored]
Branch (1010:29): [Folded - Ignored]
Branch (1010:29): [True: 0, False: 1]
|
1011 | 0 | stream |
1012 | | } else { |
1013 | 1 | let create_result = self |
1014 | 1 | .client_pool |
1015 | 1 | .next() |
1016 | 1 | .ft_create::<(), _>( |
1017 | 1 | format!("{}", get_index_name!(K::KEY_PREFIX, K::INDEX_NAME)), |
1018 | 1 | FtCreateOptions { |
1019 | 1 | on: Some(fred::types::IndexKind::Hash), |
1020 | 1 | prefixes: vec![K::KEY_PREFIX.into()], |
1021 | 1 | nohl: true, |
1022 | 1 | nofields: true, |
1023 | 1 | nofreqs: true, |
1024 | 1 | nooffsets: true, |
1025 | 1 | temporary: Some(INDEX_TTL_S), |
1026 | 1 | ..Default::default() |
1027 | 1 | }, |
1028 | 1 | vec![SearchSchema { |
1029 | 1 | field_name: K::INDEX_NAME.into(), |
1030 | 1 | alias: None, |
1031 | 1 | kind: SearchSchemaKind::Tag { |
1032 | 1 | sortable: true, |
1033 | 1 | unf: false, |
1034 | 1 | separator: None, |
1035 | 1 | casesensitive: false, |
1036 | 1 | withsuffixtrie: false, |
1037 | 1 | noindex: false, |
1038 | 1 | }, |
1039 | 1 | }], |
1040 | 1 | ) |
1041 | 1 | .await |
1042 | 1 | .err_tip(|| { |
1043 | 0 | format!( |
1044 | 0 | "Error with ft_create in RedisStore::search_by_index_prefix({})", |
1045 | 0 | get_index_name!(K::KEY_PREFIX, K::INDEX_NAME), |
1046 | 0 | ) |
1047 | 1 | }); |
1048 | 1 | let run_result = run_ft_aggregate()?0 .await.err_tip(|| { |
1049 | 0 | format!( |
1050 | 0 | "Error with second ft_aggregate in RedisStore::search_by_index_prefix({})", |
1051 | 0 | get_index_name!(K::KEY_PREFIX, K::INDEX_NAME), |
1052 | 0 | ) |
1053 | 1 | }); |
1054 | 1 | // Creating the index will race which is ok. If it fails to create, we only |
1055 | 1 | // error if the second ft_aggregate call fails and fails to create. |
1056 | 1 | match run_result { |
1057 | 1 | Ok(stream) => stream, |
1058 | 0 | Err(e) => return create_result.merge(Err(e)), |
1059 | | } |
1060 | | }; |
1061 | 1 | Ok(stream.map(|result| {0 |
1062 | 0 | let mut redis_map = |
1063 | 0 | result.err_tip(|| "Error in stream of in RedisStore::search_by_index_prefix")?; |
1064 | 0 | let bytes_data = redis_map |
1065 | 0 | .remove(&RedisKey::from_static_str(DATA_FIELD_NAME)) |
1066 | 0 | .err_tip(|| "Missing data field in RedisStore::search_by_index_prefix")? |
1067 | 0 | .into_bytes() |
1068 | 0 | .err_tip(|| { |
1069 | 0 | formatcp!("'{DATA_FIELD_NAME}' is not Bytes in RedisStore::search_by_index_prefix::into_bytes") |
1070 | 0 | })?; |
1071 | 0 | let version = if <K as SchedulerIndexProvider>::Versioned::VALUE { Branch (1071:30): [True: 0, False: 0]
Branch (1071:30): [True: 0, False: 0]
Branch (1071:30): [Folded - Ignored]
Branch (1071:30): [Folded - Ignored]
Branch (1071:30): [True: 0, False: 0]
|
1072 | 0 | redis_map |
1073 | 0 | .remove(&RedisKey::from_static_str(VERSION_FIELD_NAME)) |
1074 | 0 | .err_tip(|| "Missing version field in RedisStore::search_by_index_prefix")? |
1075 | 0 | .as_u64() |
1076 | 0 | .err_tip(|| { |
1077 | 0 | formatcp!("'{VERSION_FIELD_NAME}' is not u64 in RedisStore::search_by_index_prefix::as_u64") |
1078 | 0 | })? |
1079 | | } else { |
1080 | 0 | 0 |
1081 | | }; |
1082 | 0 | K::decode(version, bytes_data) |
1083 | 0 | .err_tip(|| "In RedisStore::search_by_index_prefix::decode") |
1084 | 1 | }0 )) |
1085 | 1 | } |
1086 | | |
1087 | 2 | async fn get_and_decode<K>( |
1088 | 2 | &self, |
1089 | 2 | key: K, |
1090 | 2 | ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error> |
1091 | 2 | where |
1092 | 2 | K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send, |
1093 | 2 | { |
1094 | 2 | let key = key.get_key(); |
1095 | 2 | let key = self.encode_key(&key); |
1096 | 2 | let client = self.client_pool.next(); |
1097 | 2 | let (maybe_version, maybe_data) = client |
1098 | 2 | .hmget::<(Option<u64>, Option<Bytes>), _, _>( |
1099 | 2 | key.as_ref(), |
1100 | 2 | vec![ |
1101 | 2 | RedisKey::from(VERSION_FIELD_NAME), |
1102 | 2 | RedisKey::from(DATA_FIELD_NAME), |
1103 | 2 | ], |
1104 | 2 | ) |
1105 | 2 | .await |
1106 | 2 | .err_tip(|| format!("In RedisStore::get_without_version::notversioned {key}")0 )?0 ; |
1107 | 2 | let Some(data) = maybe_data else { Branch (1107:13): [True: 0, False: 0]
Branch (1107:13): [True: 0, False: 0]
Branch (1107:13): [Folded - Ignored]
Branch (1107:13): [Folded - Ignored]
Branch (1107:13): [True: 2, False: 0]
|
1108 | 0 | return Ok(None); |
1109 | | }; |
1110 | 2 | Ok(Some(K::decode(maybe_version.unwrap_or(0), data).err_tip( |
1111 | 2 | || format!("In RedisStore::get_with_version::notversioned::decode {key}")0 , |
1112 | 2 | )?0 )) |
1113 | 2 | } |
1114 | | } |