/build/source/nativelink-store/src/redis_store.rs
Line | Count | Source |
1 | | // Copyright 2024-2026 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::cmp; |
16 | | use core::fmt::Debug; |
17 | | use core::marker::PhantomData; |
18 | | use core::ops::{Bound, RangeBounds}; |
19 | | use core::pin::Pin; |
20 | | use core::str::FromStr; |
21 | | use core::time::Duration; |
22 | | use std::borrow::Cow; |
23 | | use std::collections::HashSet; |
24 | | use std::sync::{Arc, Weak}; |
25 | | use std::time::Instant; |
26 | | |
27 | | use async_trait::async_trait; |
28 | | use bytes::Bytes; |
29 | | use const_format::formatcp; |
30 | | use futures::stream::FuturesUnordered; |
31 | | use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt, future}; |
32 | | use itertools::izip; |
33 | | use nativelink_config::stores::{RedisMode, RedisSpec}; |
34 | | use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err}; |
35 | | use nativelink_metric::MetricsComponent; |
36 | | use nativelink_redis_tester::SubscriptionManagerNotify; |
37 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
38 | | use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; |
39 | | use nativelink_util::spawn; |
40 | | use nativelink_util::store_trait::{ |
41 | | BoolValue, RemoveItemCallback, SchedulerCurrentVersionProvider, SchedulerIndexProvider, |
42 | | SchedulerStore, SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider, |
43 | | SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo, |
44 | | }; |
45 | | use nativelink_util::task::JoinHandleDropGuard; |
46 | | use parking_lot::{Mutex, RwLock}; |
47 | | use patricia_tree::StringPatriciaMap; |
48 | | use redis::aio::{ConnectionLike, ConnectionManager, ConnectionManagerConfig}; |
49 | | use redis::cluster::ClusterClient; |
50 | | use redis::cluster_async::ClusterConnection; |
51 | | use redis::sentinel::{SentinelClient, SentinelNodeConnectionInfo, SentinelServerType}; |
52 | | use redis::{ |
53 | | AsyncCommands, AsyncIter, Client, IntoConnectionInfo, PushInfo, ScanOptions, Script, Value, |
54 | | pipe, |
55 | | }; |
56 | | use tokio::select; |
57 | | use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}; |
58 | | use tokio::sync::{OwnedSemaphorePermit, Semaphore}; |
59 | | use tokio::time::{sleep, timeout}; |
60 | | use tokio_stream::wrappers::UnboundedReceiverStream; |
61 | | use tracing::{debug, error, info, trace, warn}; |
62 | | use url::Url; |
63 | | use uuid::Uuid; |
64 | | |
65 | | use crate::cas_utils::is_zero_digest; |
66 | | use crate::redis_utils::{ |
67 | | FtAggregateCursor, FtAggregateOptions, FtCreateOptions, SearchSchema, ft_aggregate, ft_create, |
68 | | }; |
69 | | |
70 | | /// The default size of the read chunk when reading data from Redis. |
71 | | /// Note: If this changes it should be updated in the config documentation. |
72 | | const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024; |
73 | | |
74 | | /// The default size of the connection pool if not specified. |
75 | | /// Note: If this changes it should be updated in the config documentation. |
76 | | const DEFAULT_CONNECTION_POOL_SIZE: usize = 3; |
77 | | |
78 | | /// The default delay between retries if not specified. |
79 | | /// Note: If this changes it should be updated in the config documentation. |
80 | | const DEFAULT_RETRY_DELAY: f32 = 0.1; |
81 | | |
82 | | /// The default connection timeout in milliseconds if not specified. |
83 | | /// Note: If this changes it should be updated in the config documentation. |
84 | | const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000; |
85 | | |
86 | | /// The default command timeout in milliseconds if not specified. |
87 | | /// Note: If this changes it should be updated in the config documentation. |
88 | | const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000; |
89 | | |
90 | | /// The default `check_health` PING ceiling in milliseconds if not specified. |
91 | | /// Note: If this changes it should be updated in the config documentation. |
92 | | const DEFAULT_HEALTH_CHECK_TIMEOUT_MS: u64 = 4000; |
93 | | |
94 | | /// The default maximum number of chunk uploads per update. |
95 | | /// Note: If this changes it should be updated in the config documentation. |
96 | | pub const DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE: usize = 10; |
97 | | |
98 | | /// The default COUNT value passed when scanning keys in Redis. |
99 | | /// Note: If this changes it should be updated in the config documentation. |
100 | | const DEFAULT_SCAN_COUNT: usize = 10_000; |
101 | | |
102 | | /// The default COUNT value passed when scanning search indexes |
103 | | /// Note: If this changes it should be updated in the config documentation. |
104 | | pub const DEFAULT_MAX_COUNT_PER_CURSOR: u64 = 1_500; |
105 | | |
106 | | const DEFAULT_CLIENT_PERMITS: usize = 500; |
107 | | |
108 | | /// A wrapper around Redis to allow it to be reconnected. |
109 | | pub trait RedisManager<C> |
110 | | where |
111 | | C: ConnectionLike + Clone, |
112 | | { |
113 | | /// Get a connection manager and a unique identifier for this connection |
114 | | /// which may be used to issue a reconnect later. |
115 | | fn get_connection(&self) -> impl Future<Output = Result<(C, Uuid), Error>> + Send; |
116 | | |
117 | | /// Reconnect if the uuid matches the uuid returned from `get_connection()`. |
118 | | fn reconnect(&self, uuid: Uuid) -> impl Future<Output = Result<(C, Uuid), Error>> + Send; |
119 | | |
120 | | /// Get an invocation of the update version script for a given `key`. |
121 | | fn update_script(&self, key: &str) -> redis::ScriptInvocation<'_>; |
122 | | |
123 | | /// Configure the connection to have a psubscribe on it and perform the |
124 | | /// subscription on reconnect. |
125 | | fn psubscribe(&self, pattern: &str) -> impl Future<Output = Result<(), Error>> + Send; |
126 | | } |
127 | | |
128 | | #[derive(Debug)] |
129 | | pub struct ClusterRedisManager<C> |
130 | | where |
131 | | C: ConnectionLike + Clone, |
132 | | { |
133 | | /// A constant Uuid, we never reconnect. |
134 | | uuid: Uuid, |
135 | | |
136 | | /// Redis script used to update a value in redis if the version matches. |
137 | | /// This is done by incrementing the version number and then setting the new |
138 | | /// data only if the version number matches the existing version number. |
139 | | update_if_version_matches_script: Script, |
140 | | |
141 | | /// The client pool connecting to the backing Redis instance(s). |
142 | | connection_manager: C, |
143 | | } |
144 | | |
145 | | impl<C> ClusterRedisManager<C> |
146 | | where |
147 | | C: ConnectionLike + Clone, |
148 | | { |
149 | 16 | pub async fn new(mut connection_manager: C) -> Result<Self, Error> { |
150 | 16 | let update_if_version_matches_script = Script::new(LUA_VERSION_SET_SCRIPT); |
151 | 16 | update_if_version_matches_script |
152 | 16 | .load_async(&mut connection_manager) |
153 | 16 | .await?0 ; |
154 | 16 | Ok(Self { |
155 | 16 | uuid: Uuid::new_v4(), |
156 | 16 | update_if_version_matches_script, |
157 | 16 | connection_manager, |
158 | 16 | }) |
159 | 16 | } |
160 | | } |
161 | | |
162 | | impl<C> RedisManager<C> for ClusterRedisManager<C> |
163 | | where |
164 | | C: ConnectionLike + Clone + Send + Sync, |
165 | | { |
166 | 34 | fn get_connection(&self) -> impl Future<Output = Result<(C, Uuid), Error>> + Send { |
167 | 34 | future::ready(Ok((self.connection_manager.clone(), self.uuid))) |
168 | 34 | } |
169 | | |
170 | 0 | fn reconnect(&self, _uuid: Uuid) -> impl Future<Output = Result<(C, Uuid), Error>> + Send { |
171 | 0 | self.get_connection() |
172 | 0 | } |
173 | | |
174 | 0 | fn update_script(&self, key: &str) -> redis::ScriptInvocation<'_> { |
175 | 0 | self.update_if_version_matches_script.key(key) |
176 | 0 | } |
177 | | |
178 | 0 | fn psubscribe(&self, _pattern: &str) -> impl Future<Output = Result<(), Error>> + Send { |
179 | | // This is a no-op for cluster connections. |
180 | 0 | future::ready(Ok(())) |
181 | 0 | } |
182 | | } |
183 | | |
184 | | type RedisConnectFuture<C> = dyn Future<Output = Result<C, Error>> + Send; |
185 | | type RedisConnectFn<C> = dyn Fn() -> Pin<Box<RedisConnectFuture<C>>> + Send + Sync; |
186 | | |
187 | | pub struct StandardRedisManager<C> |
188 | | where |
189 | | C: ConnectionLike + Clone, |
190 | | { |
191 | | /// Function used to re-connect to Redis. |
192 | | connect_func: Box<RedisConnectFn<C>>, |
193 | | |
194 | | /// Redis script used to update a value in redis if the version matches. |
195 | | /// This is done by incrementing the version number and then setting the new |
196 | | /// data only if the version number matches the existing version number. |
197 | | update_if_version_matches_script: Script, |
198 | | |
199 | | /// The client pool connecting to the backing Redis instance(s) and a Uuid |
200 | | /// for this connection in order to avoid multiple reconnection attempts. |
201 | | connection_manager: tokio::sync::RwLock<(C, Uuid)>, |
202 | | |
203 | | /// A list of subscription that should be performed on reconnect. |
204 | | subscriptions: Mutex<HashSet<String>>, |
205 | | } |
206 | | |
207 | | impl<C> Debug for StandardRedisManager<C> |
208 | | where |
209 | | C: ConnectionLike + Clone, |
210 | | { |
211 | 0 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
212 | 0 | f.debug_struct("StandardRedisManager") |
213 | 0 | .field( |
214 | 0 | "update_if_version_matches_script", |
215 | 0 | &self.update_if_version_matches_script, |
216 | 0 | ) |
217 | 0 | .field("subscriptions", &self.subscriptions) |
218 | 0 | .finish() |
219 | 0 | } |
220 | | } |
221 | | |
222 | | impl<C> StandardRedisManager<C> |
223 | | where |
224 | | C: ConnectionLike + Clone + Send + Sync, |
225 | | { |
226 | 17 | async fn configure(&self, connection_manager: &mut C) -> Result<(), Error> { |
227 | 17 | self.update_if_version_matches_script |
228 | 17 | .load_async(connection_manager) |
229 | 17 | .await?0 ; |
230 | 17 | Ok(()) |
231 | 17 | } |
232 | | |
233 | 18 | async fn new(connect_func: Box<RedisConnectFn<C>>) -> Result<Self, Error> { |
234 | 18 | let connection_manager15 = connect_func().await?3 ; |
235 | 15 | let update_if_version_matches_script = Script::new(LUA_VERSION_SET_SCRIPT); |
236 | 15 | let connection = Self { |
237 | 15 | connect_func, |
238 | 15 | update_if_version_matches_script, |
239 | 15 | connection_manager: tokio::sync::RwLock::new((connection_manager, Uuid::new_v4())), |
240 | 15 | subscriptions: Mutex::new(HashSet::new()), |
241 | 15 | }; |
242 | | { |
243 | 15 | let mut connection_manager = connection.connection_manager.write().await; |
244 | 15 | connection.configure(&mut connection_manager.0).await?0 ; |
245 | | } |
246 | 15 | Ok(connection) |
247 | 18 | } |
248 | | } |
249 | | |
250 | | impl RedisManager<ConnectionManager> for StandardRedisManager<ConnectionManager> { |
251 | 146 | async fn get_connection(&self) -> Result<(ConnectionManager, Uuid), Error> { |
252 | 146 | Ok(self.connection_manager.read().await.clone()) |
253 | 146 | } |
254 | | |
255 | 2 | async fn reconnect(&self, uuid: Uuid) -> Result<(ConnectionManager, Uuid), Error> { |
256 | 2 | let mut guard = self.connection_manager.write().await; |
257 | 2 | if guard.1 != uuid { |
258 | 0 | let connection = guard.clone(); |
259 | 0 | drop(guard); |
260 | 0 | return Ok(connection); |
261 | 2 | } |
262 | 2 | let mut connection_manager = (self.connect_func)().await?0 ; |
263 | 2 | let uuid = Uuid::new_v4(); |
264 | 2 | self.configure(&mut connection_manager).await?0 ; |
265 | 2 | let subscriptions = { |
266 | 2 | let guard = self.subscriptions.lock(); |
267 | 2 | guard.iter().map(Clone::clone).collect::<Vec<_>>() |
268 | | }; |
269 | 2 | for subscription0 in subscriptions { |
270 | 0 | connection_manager.psubscribe(&subscription).await?; |
271 | | } |
272 | 2 | *guard = (connection_manager.clone(), uuid); |
273 | 2 | Ok((connection_manager, uuid)) |
274 | 2 | } |
275 | | |
276 | 18 | fn update_script(&self, key: &str) -> redis::ScriptInvocation<'_> { |
277 | 18 | self.update_if_version_matches_script.key(key) |
278 | 18 | } |
279 | | |
280 | 3 | async fn psubscribe(&self, pattern: &str) -> Result<(), Error> { |
281 | 3 | let mut connection = self.get_connection().await?0 .0; |
282 | 3 | let new_subscription = self.subscriptions.lock().insert(String::from(pattern)); |
283 | 3 | if new_subscription { |
284 | 3 | let result = connection.psubscribe(pattern).await; |
285 | 3 | if result.is_err() { |
286 | 0 | self.subscriptions.lock().remove(pattern); |
287 | 3 | } |
288 | 3 | result?0 ; |
289 | 0 | } |
290 | 3 | Ok(()) |
291 | 3 | } |
292 | | } |
293 | | |
294 | | /// A [`StoreDriver`] implementation that uses Redis as a backing store. |
295 | | #[derive(MetricsComponent)] |
296 | | pub struct RedisStore<C, M> |
297 | | where |
298 | | C: ConnectionLike + Clone, |
299 | | M: RedisManager<C>, |
300 | | { |
301 | | /// The client pool connecting to the backing Redis instance(s). |
302 | | connection_manager: M, |
303 | | |
304 | | /// The underlying connection type in the connection manager. |
305 | | _connection_type: PhantomData<C>, |
306 | | |
307 | | /// A channel to publish updates to when a key is added, removed, or modified. |
308 | | #[metric( |
309 | | help = "The pubsub channel to publish updates to when a key is added, removed, or modified" |
310 | | )] |
311 | | pub_sub_channel: Option<String>, |
312 | | |
313 | | /// A function used to generate names for temporary keys. |
314 | | temp_name_generator_fn: fn() -> String, |
315 | | |
316 | | /// A common prefix to append to all keys before they are sent to Redis. |
317 | | /// |
318 | | /// See [`RedisStore::key_prefix`](`nativelink_config::stores::RedisStore::key_prefix`). |
319 | | #[metric(help = "Prefix to append to all keys before sending to Redis")] |
320 | | key_prefix: String, |
321 | | |
322 | | /// The amount of data to read from Redis at a time. |
323 | | #[metric(help = "The amount of data to read from Redis at a time")] |
324 | | read_chunk_size: usize, |
325 | | |
326 | | /// The maximum number of chunk uploads per update. |
327 | | /// This is used to limit the number of chunk uploads per update to prevent |
328 | | /// overloading when uploading large blocks of data |
329 | | #[metric(help = "The maximum number of chunk uploads per update")] |
330 | | max_chunk_uploads_per_update: usize, |
331 | | |
332 | | /// The COUNT value passed when scanning keys in Redis. |
333 | | /// This is used to hint the amount of work that should be done per response. |
334 | | #[metric(help = "The COUNT value passed when scanning keys in Redis")] |
335 | | scan_count: usize, |
336 | | |
337 | | /// The COUNT value used with search indexes |
338 | | #[metric(help = "The maximum number of results to return per cursor")] |
339 | | max_count_per_cursor: u64, |
340 | | |
341 | | /// A manager for subscriptions to keys in Redis. |
342 | | subscription_manager: tokio::sync::OnceCell<Arc<RedisSubscriptionManager>>, |
343 | | |
344 | | /// Channel for getting subscription messages |
345 | | subscriber_channel: Mutex<Option<UnboundedReceiver<PushInfo>>>, |
346 | | |
347 | | /// Permits to limit inflight Redis requests. Technically only |
348 | | /// limits the calls to `get_client()`, but the requests per client |
349 | | /// are small enough that it works well enough. |
350 | | client_permits: Arc<Semaphore>, |
351 | | |
352 | | /// Per-call ceiling for `check_health` PING. |
353 | | health_check_timeout: Duration, |
354 | | } |
355 | | |
356 | | impl<C, M> Debug for RedisStore<C, M> |
357 | | where |
358 | | C: ConnectionLike + Clone, |
359 | | M: RedisManager<C>, |
360 | | { |
361 | 0 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
362 | 0 | f.debug_struct("RedisStore") |
363 | 0 | .field("temp_name_generator_fn", &self.temp_name_generator_fn) |
364 | 0 | .field("key_prefix", &self.key_prefix) |
365 | 0 | .field("read_chunk_size", &self.read_chunk_size) |
366 | 0 | .field( |
367 | 0 | "max_chunk_uploads_per_update", |
368 | 0 | &self.max_chunk_uploads_per_update, |
369 | 0 | ) |
370 | 0 | .field("scan_count", &self.scan_count) |
371 | 0 | .field("subscription_manager", &self.subscription_manager) |
372 | 0 | .field("subscriber_channel", &self.subscriber_channel) |
373 | 0 | .field("client_permits", &self.client_permits) |
374 | 0 | .finish() |
375 | 0 | } |
376 | | } |
377 | | |
378 | | struct ClientWithPermit<C: ConnectionLike> { |
379 | | connection_manager: C, |
380 | | uuid: Uuid, |
381 | | |
382 | | // here so it sticks around with the client and doesn't get dropped until that does |
383 | | #[allow(dead_code)] |
384 | | semaphore_permit: OwnedSemaphorePermit, |
385 | | } |
386 | | |
387 | | impl<C: ConnectionLike + Clone> ClientWithPermit<C> { |
388 | 1 | async fn reconnect<M: RedisManager<C> + Sync>(&mut self, manager: &M) -> Result<(), Error> { |
389 | 1 | (self.connection_manager, self.uuid) = manager.reconnect(self.uuid).await?0 ; |
390 | 1 | Ok(()) |
391 | 1 | } |
392 | | } |
393 | | |
394 | | impl<C: ConnectionLike> Drop for ClientWithPermit<C> { |
395 | 142 | fn drop(&mut self) { |
396 | 142 | trace!( |
397 | 142 | remaining = self.semaphore_permit.semaphore().available_permits(), |
398 | | "Dropping a client permit" |
399 | | ); |
400 | 142 | } |
401 | | } |
402 | | |
403 | | impl<C, M> RedisStore<C, M> |
404 | | where |
405 | | C: ConnectionLike + Clone + Sync, |
406 | | M: RedisManager<C> + Sync, |
407 | | { |
408 | | /// Used for testing when determinism is required. |
409 | | #[expect(clippy::too_many_arguments)] |
410 | 31 | pub async fn new_from_builder_and_parts( |
411 | 31 | pub_sub_channel: Option<String>, |
412 | 31 | temp_name_generator_fn: fn() -> String, |
413 | 31 | key_prefix: String, |
414 | 31 | read_chunk_size: usize, |
415 | 31 | max_chunk_uploads_per_update: usize, |
416 | 31 | scan_count: usize, |
417 | 31 | max_client_permits: usize, |
418 | 31 | max_count_per_cursor: u64, |
419 | 31 | health_check_timeout: Duration, |
420 | 31 | subscriber_channel: UnboundedReceiver<PushInfo>, |
421 | 31 | connection_manager: M, |
422 | 31 | ) -> Result<Self, Error> { |
423 | 31 | info!("Redis index fingerprint: {FINGERPRINT_CREATE_INDEX_HEX}"); |
424 | | |
425 | 31 | Ok(Self { |
426 | 31 | connection_manager, |
427 | 31 | _connection_type: PhantomData, |
428 | 31 | pub_sub_channel, |
429 | 31 | temp_name_generator_fn, |
430 | 31 | key_prefix, |
431 | 31 | read_chunk_size, |
432 | 31 | max_chunk_uploads_per_update, |
433 | 31 | scan_count, |
434 | 31 | subscription_manager: tokio::sync::OnceCell::new(), |
435 | 31 | subscriber_channel: Mutex::new(Some(subscriber_channel)), |
436 | 31 | client_permits: Arc::new(Semaphore::new(max_client_permits)), |
437 | 31 | max_count_per_cursor, |
438 | 31 | health_check_timeout, |
439 | 31 | }) |
440 | 31 | } |
441 | | |
442 | 142 | async fn get_client(&self) -> Result<ClientWithPermit<C>, Error> { |
443 | 142 | let local_client_permits = self.client_permits.clone(); |
444 | 142 | let remaining = local_client_permits.available_permits(); |
445 | 142 | let semaphore_permit = local_client_permits.acquire_owned().await?0 ; |
446 | 142 | trace!(remaining, "Got a client permit"); |
447 | 142 | let (connection_manager, uuid) = self.connection_manager.get_connection().await?0 ; |
448 | 142 | Ok(ClientWithPermit { |
449 | 142 | connection_manager, |
450 | 142 | uuid, |
451 | 142 | semaphore_permit, |
452 | 142 | }) |
453 | 142 | } |
454 | | |
455 | | /// Encode a [`StoreKey`] so it can be sent to Redis. |
456 | 136 | fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> { |
457 | 136 | let key_body = key.as_str(); |
458 | 136 | if self.key_prefix.is_empty() { |
459 | 132 | key_body |
460 | | } else { |
461 | | // This is in the hot path for all redis operations, so we try to reuse the allocation |
462 | | // from `key.as_str()` if possible. |
463 | 4 | match key_body { |
464 | 4 | Cow::Owned(mut encoded_key) => { |
465 | 4 | encoded_key.insert_str(0, &self.key_prefix); |
466 | 4 | Cow::Owned(encoded_key) |
467 | | } |
468 | 0 | Cow::Borrowed(body) => { |
469 | 0 | let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len()); |
470 | 0 | encoded_key.push_str(&self.key_prefix); |
471 | 0 | encoded_key.push_str(body); |
472 | 0 | Cow::Owned(encoded_key) |
473 | | } |
474 | | } |
475 | | } |
476 | 136 | } |
477 | | |
478 | 18 | fn set_spec_defaults(spec: &mut RedisSpec) -> Result<(), Error> { |
479 | 18 | if spec.addresses.is_empty() { |
480 | 0 | return Err(make_err!( |
481 | 0 | Code::InvalidArgument, |
482 | 0 | "No addresses were specified in redis store configuration." |
483 | 0 | )); |
484 | 18 | } |
485 | | |
486 | 18 | if spec.broadcast_channel_capacity != 0 { |
487 | 1 | warn!("broadcast_channel_capacity in Redis spec is deprecated and ignored"); |
488 | 17 | } |
489 | 18 | if spec.response_timeout_s != 0 { |
490 | 0 | warn!( |
491 | | "response_timeout_s in Redis spec is deprecated and ignored, use command_timeout_ms" |
492 | | ); |
493 | 18 | } |
494 | 18 | if spec.connection_timeout_s != 0 { |
495 | 0 | if spec.connection_timeout_ms != 0 { |
496 | 0 | return Err(make_err!( |
497 | 0 | Code::InvalidArgument, |
498 | 0 | "Both connection_timeout_s and connection_timeout_ms were set, can only have one!" |
499 | 0 | )); |
500 | 0 | } |
501 | 0 | warn!("connection_timeout_s in Redis spec is deprecated, use connection_timeout_ms"); |
502 | 0 | spec.connection_timeout_ms = spec.connection_timeout_s * 1000; |
503 | 18 | } |
504 | 18 | if spec.connection_timeout_ms == 0 { |
505 | 13 | spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS; |
506 | 13 | }5 |
507 | 18 | if spec.command_timeout_ms == 0 { |
508 | 16 | spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS; |
509 | 16 | }2 |
510 | 18 | if spec.health_check_timeout_ms == 0 { |
511 | 18 | spec.health_check_timeout_ms = DEFAULT_HEALTH_CHECK_TIMEOUT_MS; |
512 | 18 | }0 |
513 | 18 | if spec.connection_pool_size == 0 { |
514 | 18 | spec.connection_pool_size = DEFAULT_CONNECTION_POOL_SIZE; |
515 | 18 | }0 |
516 | 18 | if spec.read_chunk_size == 0 { |
517 | 18 | spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE; |
518 | 18 | }0 |
519 | 18 | if spec.max_count_per_cursor == 0 { |
520 | 18 | spec.max_count_per_cursor = DEFAULT_MAX_COUNT_PER_CURSOR; |
521 | 18 | }0 |
522 | 18 | if spec.max_chunk_uploads_per_update == 0 { |
523 | 18 | spec.max_chunk_uploads_per_update = DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE; |
524 | 18 | }0 |
525 | 18 | if spec.scan_count == 0 { |
526 | 18 | spec.scan_count = DEFAULT_SCAN_COUNT; |
527 | 18 | }0 |
528 | 18 | if spec.max_client_permits == 0 { |
529 | 18 | spec.max_client_permits = DEFAULT_CLIENT_PERMITS; |
530 | 18 | }0 |
531 | 18 | if spec.retry.delay == 0.0 { |
532 | 18 | spec.retry.delay = DEFAULT_RETRY_DELAY; |
533 | 18 | }0 |
534 | 18 | if spec.retry.max_retries == 0 { |
535 | 18 | spec.retry.max_retries = 1; |
536 | 18 | }0 |
537 | 18 | trace!(?spec, "redis spec is after setting defaults"); |
538 | 18 | Ok(()) |
539 | 18 | } |
540 | | |
541 | | // Only used by tests, because we need to make a real redis connection, then fix this to get fixed values |
542 | 6 | pub fn replace_temp_name_generator(&mut self, replacement: fn() -> String) { |
543 | 6 | self.temp_name_generator_fn = replacement; |
544 | 6 | } |
545 | | } |
546 | | |
547 | | impl RedisStore<ClusterConnection, ClusterRedisManager<ClusterConnection>> { |
548 | 0 | pub async fn new_cluster(mut spec: RedisSpec) -> Result<Arc<Self>, Error> { |
549 | 0 | if spec.mode != RedisMode::Cluster { |
550 | 0 | return Err(Error::new( |
551 | 0 | Code::InvalidArgument, |
552 | 0 | "new_cluster only works for Cluster mode".to_string(), |
553 | 0 | )); |
554 | 0 | } |
555 | 0 | Self::set_spec_defaults(&mut spec)?; |
556 | | |
557 | 0 | let parsed_addrs: Vec<_> = spec |
558 | 0 | .addresses |
559 | 0 | .iter_mut() |
560 | 0 | .map(|addr| { |
561 | 0 | addr.clone().into_connection_info().map(|connection_info| { |
562 | 0 | let redis_settings = connection_info |
563 | 0 | .redis_settings() |
564 | 0 | .clone() |
565 | | // We need RESP3 here because the cluster mode doesn't support RESP2 pubsub |
566 | | // See also https://docs.rs/redis/latest/redis/cluster_async/index.html#pubsub |
567 | 0 | .set_protocol(redis::ProtocolVersion::RESP3); |
568 | 0 | connection_info.set_redis_settings(redis_settings) |
569 | 0 | }) |
570 | 0 | }) |
571 | 0 | .collect::<Result<Vec<_>, _>>()?; |
572 | | |
573 | 0 | let connection_timeout = Duration::from_millis(spec.connection_timeout_ms); |
574 | 0 | let command_timeout = Duration::from_millis(spec.command_timeout_ms); |
575 | 0 | let (tx, subscriber_channel) = unbounded_channel(); |
576 | | |
577 | 0 | let builder = ClusterClient::builder(parsed_addrs) |
578 | 0 | .connection_timeout(connection_timeout) |
579 | 0 | .response_timeout(command_timeout) |
580 | 0 | .push_sender(tx) |
581 | 0 | .retries(u32::try_from(spec.retry.max_retries)?); |
582 | | |
583 | 0 | let client = builder.build()?; |
584 | | |
585 | 0 | Self::new_from_builder_and_parts( |
586 | 0 | spec.experimental_pub_sub_channel, |
587 | 0 | || Uuid::new_v4().to_string(), |
588 | 0 | spec.key_prefix.clone(), |
589 | 0 | spec.read_chunk_size, |
590 | 0 | spec.max_chunk_uploads_per_update, |
591 | 0 | spec.scan_count, |
592 | 0 | spec.max_client_permits, |
593 | 0 | spec.max_count_per_cursor, |
594 | 0 | Duration::from_millis(spec.health_check_timeout_ms), |
595 | 0 | subscriber_channel, |
596 | 0 | ClusterRedisManager::new(client.get_async_connection().await?).await?, |
597 | | ) |
598 | 0 | .await |
599 | 0 | .map(Arc::new) |
600 | 0 | } |
601 | | } |
602 | | |
603 | | impl RedisStore<ConnectionManager, StandardRedisManager<ConnectionManager>> { |
604 | 20 | async fn connect( |
605 | 20 | spec: RedisSpec, |
606 | 20 | tx: UnboundedSender<PushInfo>, |
607 | 20 | ) -> Result<ConnectionManager, Error> { |
608 | 20 | let connection_timeout = Duration::from_millis(spec.connection_timeout_ms); |
609 | 20 | let command_timeout = Duration::from_millis(spec.command_timeout_ms); |
610 | | |
611 | 20 | let addr = &spec.addresses[0]; |
612 | 20 | let local_addr = addr.clone(); |
613 | 20 | let mut parsed_addr = local_addr |
614 | 20 | .replace("redis+sentinel://", "redis://") |
615 | 20 | .into_connection_info()?0 ; |
616 | | |
617 | 20 | let redis_settings = parsed_addr |
618 | 20 | .redis_settings() |
619 | 20 | .clone() |
620 | | // We need RESP3 here because we want to do set_push_sender |
621 | 20 | .set_protocol(redis::ProtocolVersion::RESP3); |
622 | 20 | parsed_addr = parsed_addr.set_redis_settings(redis_settings); |
623 | 20 | debug!(?parsed_addr, "Parsed redis addr"); |
624 | | |
625 | 20 | let client19 = timeout( |
626 | 20 | connection_timeout, |
627 | 20 | spawn!("connect", async move { |
628 | 20 | match spec.mode { |
629 | 11 | RedisMode::Standard => Client::open(parsed_addr).map_err(Into::<Error>::into), |
630 | | RedisMode::Cluster => { |
631 | 0 | return Err(Error::new( |
632 | 0 | Code::Internal, |
633 | 0 | "Use RedisStore::new_cluster for cluster connections".to_owned(), |
634 | 0 | )); |
635 | | } |
636 | 9 | RedisMode::Sentinel => async { |
637 | 9 | let url_parsing = Url::parse(&local_addr)?0 ; |
638 | 9 | let master_name = url_parsing |
639 | 9 | .query_pairs() |
640 | 9 | .find(|(key, _)| key1 == "sentinelServiceName"1 ) |
641 | 9 | .map_or_else(|| "master"8 .into8 (), |(_, value)| value1 .to_string1 ()); |
642 | | |
643 | 9 | let redis_connection_info = parsed_addr.redis_settings().clone(); |
644 | 9 | let sentinel_connection_info = SentinelNodeConnectionInfo::default() |
645 | 9 | .set_redis_connection_info(redis_connection_info); |
646 | | |
647 | | // We fish this out because sentinels don't support db, we need to set it |
648 | | // on the client only. See also https://github.com/redis-rs/redis-rs/issues/1950 |
649 | 9 | let original_db = parsed_addr.redis_settings().db(); |
650 | 9 | if original_db != 0 { |
651 | 1 | // sentinel_connection_info has the actual DB set |
652 | 1 | let revised_settings = parsed_addr.redis_settings().clone().set_db(0); |
653 | 1 | parsed_addr = parsed_addr.set_redis_settings(revised_settings); |
654 | 8 | } |
655 | | |
656 | 9 | SentinelClient::build( |
657 | 9 | vec![parsed_addr], |
658 | 9 | master_name, |
659 | 9 | Some(sentinel_connection_info), |
660 | 9 | SentinelServerType::Master, |
661 | | ) |
662 | 9 | .map_err(Into::<Error>::into) |
663 | 9 | } |
664 | 18 | .and_then9 (|mut s| async move {9 Ok(s9 .async_get_client().await) }) |
665 | 9 | .await?0 |
666 | 9 | .map_err(Into::<Error>::into), |
667 | | } |
668 | 20 | .err_tip_with_code(|_e| {1 |
669 | 1 | ( |
670 | 1 | Code::InvalidArgument, |
671 | 1 | format!("While connecting to redis with url: {local_addr}"), |
672 | 1 | ) |
673 | 1 | }) |
674 | 20 | }), |
675 | | ) |
676 | 20 | .await |
677 | 20 | .err_tip(|| format!0 ("Timeout while connecting to redis with url: {addr}"))?0 ?0 ?1 ; |
678 | | |
679 | 19 | let connection_manager_config = { |
680 | 19 | ConnectionManagerConfig::new() |
681 | 19 | .set_number_of_retries(spec.retry.max_retries) |
682 | 19 | .set_connection_timeout(Some(connection_timeout)) |
683 | 19 | .set_response_timeout(Some(command_timeout)) |
684 | 19 | .set_push_sender(tx) |
685 | | }; |
686 | | |
687 | 17 | let mut connection_manager = |
688 | 19 | ConnectionManager::new_with_config(client, connection_manager_config) |
689 | 19 | .await |
690 | 19 | .err_tip(|| format!2 ("While connecting to redis with url: {addr}"))?2 ; |
691 | | |
692 | 17 | if let Some(pub_sub_channel3 ) = spec.experimental_pub_sub_channel { |
693 | 3 | connection_manager.psubscribe(pub_sub_channel).await?0 ; |
694 | 14 | } |
695 | | |
696 | 17 | Ok(connection_manager) |
697 | 20 | } |
698 | | |
699 | | /// Create a new `RedisStore` from the given configuration. |
700 | 18 | pub async fn new_standard(mut spec: RedisSpec) -> Result<Arc<Self>, Error> { |
701 | 18 | Self::set_spec_defaults(&mut spec)?0 ; |
702 | | |
703 | 18 | if spec.addresses.len() != 1 { |
704 | 0 | return Err(make_err!( |
705 | 0 | Code::Unimplemented, |
706 | 0 | "Connecting directly to multiple redis nodes in a cluster is currently unsupported. Please specify a single URL to a single node, and nativelink will use cluster discover to find the other nodes." |
707 | 0 | )); |
708 | 18 | } |
709 | | |
710 | 18 | let (tx, subscriber_channel) = unbounded_channel(); |
711 | | |
712 | 15 | Self::new_from_builder_and_parts( |
713 | 18 | spec.experimental_pub_sub_channel.clone(), |
714 | 1 | || Uuid::new_v4().to_string(), |
715 | 18 | spec.key_prefix.clone(), |
716 | 18 | spec.read_chunk_size, |
717 | 18 | spec.max_chunk_uploads_per_update, |
718 | 18 | spec.scan_count, |
719 | 18 | spec.max_client_permits, |
720 | 18 | spec.max_count_per_cursor, |
721 | 18 | Duration::from_millis(spec.health_check_timeout_ms), |
722 | 18 | subscriber_channel, |
723 | 20 | StandardRedisManager::new18 (Box::new18 (move || { |
724 | 20 | Box::pin(Self::connect(spec.clone(), tx.clone())) |
725 | 20 | })) |
726 | 18 | .await?3 , |
727 | | ) |
728 | 15 | .await |
729 | 15 | .map(Arc::new) |
730 | 18 | } |
731 | | } |
732 | | |
733 | | #[async_trait] |
734 | | impl<C, M> StoreDriver for RedisStore<C, M> |
735 | | where |
736 | | C: ConnectionLike + Clone + Send + Sync + Unpin + 'static, |
737 | | M: RedisManager<C> + Unpin + Send + Sync + 'static, |
738 | | { |
739 | | async fn has_with_results( |
740 | | self: Pin<&Self>, |
741 | | keys: &[StoreKey<'_>], |
742 | | results: &mut [Option<u64>], |
743 | 6 | ) -> Result<(), Error> { |
744 | | // TODO(palfrey) We could use pipeline here, but it makes retry more |
745 | | // difficult and it doesn't work very well in cluster mode. |
746 | | // If we wanted to optimize this with pipeline be careful to |
747 | | // implement retry and to support cluster mode. |
748 | | |
749 | | izip!(keys.iter(), results.iter_mut(),) |
750 | 6 | .map(|(key, result)| async move { |
751 | | // We need to do a special pass to ensure our zero key exist. |
752 | 6 | if is_zero_digest(key.borrow()) { |
753 | 2 | *result = Some(0); |
754 | 2 | return Ok::<_, Error>(()); |
755 | 4 | } |
756 | 4 | let encoded_key = self.encode_key(key); |
757 | | |
758 | 4 | let mut client = self.get_client().await?0 ; |
759 | | |
760 | | // Redis returns 0 when the key doesn't exist |
761 | | // AND when the key exists with value of length 0. |
762 | | // Therefore, we need to check both length and existence |
763 | | // and do it in a pipeline for efficiency |
764 | 4 | let (blob_len, exists) = pipe() |
765 | 4 | .strlen(encoded_key.as_ref()) |
766 | 4 | .exists(encoded_key.as_ref()) |
767 | 4 | .query_async::<(u64, bool)>(&mut client.connection_manager) |
768 | 4 | .await |
769 | 4 | .err_tip(|| "In RedisStore::has_with_results::all")?0 ; |
770 | | |
771 | 4 | *result = if exists { Some(blob_len) } else { None0 }; |
772 | | |
773 | 4 | Ok::<_, Error>(()) |
774 | 12 | }) |
775 | | .collect::<FuturesUnordered<_>>() |
776 | | .try_collect() |
777 | | .await |
778 | 6 | } |
779 | | |
780 | | async fn list( |
781 | | self: Pin<&Self>, |
782 | | range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>), |
783 | | handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_), |
784 | 8 | ) -> Result<u64, Error> { |
785 | | let range = ( |
786 | | range.0.map(StoreKey::into_owned), |
787 | | range.1.map(StoreKey::into_owned), |
788 | | ); |
789 | | let pattern = match range.0 { |
790 | | Bound::Included(ref start) | Bound::Excluded(ref start) => match range.1 { |
791 | | Bound::Included(ref end) | Bound::Excluded(ref end) => { |
792 | | let start = start.as_str(); |
793 | | let end = end.as_str(); |
794 | | let max_length = start.len().min(end.len()); |
795 | | let length = start |
796 | | .chars() |
797 | | .zip(end.chars()) |
798 | 20 | .position(|(a, b)| a != b) |
799 | | .unwrap_or(max_length); |
800 | | format!("{}{}*", self.key_prefix, &start[..length]) |
801 | | } |
802 | | Bound::Unbounded => format!("{}*", self.key_prefix), |
803 | | }, |
804 | | Bound::Unbounded => format!("{}*", self.key_prefix), |
805 | | }; |
806 | | let mut client = self.get_client().await?; |
807 | | trace!(%pattern, count=self.scan_count, "Running SCAN"); |
808 | | let opts = ScanOptions::default() |
809 | | .with_pattern(pattern) |
810 | | .with_count(self.scan_count); |
811 | | let mut scan_stream: AsyncIter<Value> = client |
812 | | .connection_manager |
813 | | .scan_options(opts) |
814 | | .await |
815 | | .err_tip(|| "During scan_options")?; |
816 | | let mut iterations = 0; |
817 | | let mut errors = vec![]; |
818 | | while let Some(key) = scan_stream.next_item().await { |
819 | | if let Ok(Value::BulkString(raw_key)) = key { |
820 | | let Ok(str_key) = str::from_utf8(&raw_key) else { |
821 | | error!(?raw_key, "Non-utf8 key"); |
822 | | errors.push(format!("Non-utf8 key {raw_key:?}")); |
823 | | continue; |
824 | | }; |
825 | | if let Some(key) = str_key.strip_prefix(&self.key_prefix) { |
826 | | let key = StoreKey::new_str(key); |
827 | | if range.contains(&key) { |
828 | | iterations += 1; |
829 | | if !handler(&key) { |
830 | | error!("Issue in handler"); |
831 | | errors.push("Issue in handler".to_string()); |
832 | | } |
833 | | } else { |
834 | | trace!(%key, ?range, "Key not in range"); |
835 | | } |
836 | | } else { |
837 | | errors.push("Key doesn't match prefix".to_string()); |
838 | | } |
839 | | } else { |
840 | | error!(?key, "Non-string in key"); |
841 | | errors.push("Non-string in key".to_string()); |
842 | | } |
843 | | } |
844 | | if errors.is_empty() { |
845 | | Ok(iterations) |
846 | | } else { |
847 | | error!(?errors, "Errors in scan stream"); |
848 | | Err(Error::new(Code::Internal, format!("Errors: {errors:?}"))) |
849 | | } |
850 | 8 | } |
851 | | |
852 | | async fn update( |
853 | | self: Pin<&Self>, |
854 | | key: StoreKey<'_>, |
855 | | mut reader: DropCloserReadHalf, |
856 | | _upload_size: UploadSizeInfo, |
857 | 9 | ) -> Result<u64, Error> { |
858 | | let final_key = self.encode_key(&key); |
859 | | |
860 | | // While the name generation function can be supplied by the user, we need to have the curly |
861 | | // braces in place in order to manage redis' hashing behavior and make sure that the temporary |
862 | | // key name and the final key name are directed to the same cluster node. See |
863 | | // https://redis.io/blog/redis-clustering-best-practices-with-keys/ |
864 | | // |
865 | | // The TL;DR is that if we're in cluster mode and the names hash differently, we can't use request |
866 | | // pipelining. By using these braces, we tell redis to only hash the part of the temporary key that's |
867 | | // identical to the final key -- so they will always hash to the same node. |
868 | | let temp_key = format!( |
869 | | "temp-{}-{{{}}}", |
870 | | (self.temp_name_generator_fn)(), |
871 | | &final_key |
872 | | ); |
873 | | |
874 | | if is_zero_digest(key.borrow()) { |
875 | | let chunk = reader |
876 | | .peek() |
877 | | .await |
878 | | .err_tip(|| "Failed to peek in RedisStore::update")?; |
879 | | if chunk.is_empty() { |
880 | | reader |
881 | | .drain() |
882 | | .await |
883 | | .err_tip(|| "Failed to drain in RedisStore::update")?; |
884 | | // Zero-digest keys are special -- we don't need to do anything with it. |
885 | | return Ok(0); |
886 | | } |
887 | | } |
888 | | |
889 | | let mut client = self.get_client().await?; |
890 | | |
891 | | let mut read_stream = reader |
892 | 8 | .scan(0u32, |bytes_read, chunk_res| { |
893 | 8 | future::ready(Some( |
894 | 8 | chunk_res |
895 | 8 | .err_tip(|| "Failed to read chunk in update in redis store") |
896 | 8 | .and_then(|chunk| {7 |
897 | 7 | let offset = isize::try_from(*bytes_read).err_tip(|| "Could not convert offset to isize in RedisStore::update")?0 ; |
898 | 7 | let chunk_len = u32::try_from(chunk.len()).err_tip( |
899 | | || "Could not convert chunk length to u32 in RedisStore::update", |
900 | 0 | )?; |
901 | 7 | let new_bytes_read = bytes_read |
902 | 7 | .checked_add(chunk_len) |
903 | 7 | .err_tip(|| "Overflow protection in RedisStore::update")?0 ; |
904 | 7 | *bytes_read = new_bytes_read; |
905 | 7 | Ok::<_, Error>((offset, *bytes_read, chunk)) |
906 | 7 | }), |
907 | | )) |
908 | 8 | }) |
909 | 8 | .map(|res| { |
910 | 8 | let (offset7 , end_pos7 , chunk7 ) = res?1 ; |
911 | 7 | let temp_key_ref = &temp_key; |
912 | 7 | Ok(async move { |
913 | 7 | let (mut connection_manager, connect_id) = self.connection_manager.get_connection().await?0 ; |
914 | 7 | match connection_manager |
915 | 7 | .setrange::<_, _, usize>(temp_key_ref, offset, chunk.to_vec()) |
916 | 7 | .await { |
917 | 5 | Ok(_) => {}, |
918 | 1 | Err(err) |
919 | 2 | if err.kind() == redis::ErrorKind::Server(redis::ServerErrorKind::ReadOnly)1 => |
920 | | { |
921 | 1 | let (mut connection_manager, _connect_id) = self.connection_manager.reconnect(connect_id).await?0 ; |
922 | 1 | connection_manager |
923 | 1 | .setrange::<_, _, usize>(temp_key_ref, offset, chunk.to_vec()) |
924 | 1 | .await |
925 | 1 | .err_tip( |
926 | 0 | || format!("(after reconnect) while appending to temp key ({temp_key_ref}) in RedisStore::update. offset = {offset}. end_pos = {end_pos}"), |
927 | 0 | )?; |
928 | | } |
929 | 1 | Err(err) => { |
930 | 1 | let mut error: Error = err.into(); |
931 | 1 | error |
932 | 1 | .messages |
933 | 1 | .push(format!("While appending to temp key ({temp_key_ref}) in RedisStore::update. offset = {offset}. end_pos = {end_pos}")); |
934 | 1 | return Err(error); |
935 | | } |
936 | | } |
937 | 6 | Ok::<u32, Error>(end_pos) |
938 | 7 | }) |
939 | 8 | }) |
940 | | .try_buffer_unordered(self.max_chunk_uploads_per_update); |
941 | | |
942 | | let mut total_len: u32 = 0; |
943 | | while let Some(last_pos) = read_stream.try_next().await? { |
944 | | if last_pos > total_len { |
945 | | total_len = last_pos; |
946 | | } |
947 | | } |
948 | | |
949 | | let blob_len: usize = client |
950 | | .connection_manager |
951 | | .strlen(&temp_key) |
952 | | .await |
953 | 0 | .err_tip(|| format!("In RedisStore::update strlen check for {temp_key}"))?; |
954 | | // This is a safety check to ensure that in the event some kind of retry was to happen |
955 | | // and the data was appended to the key twice, we reject the data. |
956 | | if blob_len != usize::try_from(total_len).unwrap_or(usize::MAX) { |
957 | | return Err(make_input_err!( |
958 | | "Data length mismatch in RedisStore::update for {}({}) - expected {} bytes, got {} bytes", |
959 | | key.borrow().as_str(), |
960 | | temp_key, |
961 | | total_len, |
962 | | blob_len, |
963 | | )); |
964 | | } |
965 | | |
966 | | // Rename the temp key so that the data appears under the real key. Any data already present in the real key is lost. |
967 | | client |
968 | | .connection_manager |
969 | | .rename::<_, _, ()>(&temp_key, final_key.as_ref()) |
970 | | .await |
971 | | .err_tip(|| "While queueing key rename in RedisStore::update()")?; |
972 | | |
973 | | // If we have a publish channel configured, send a notice that the key has been set. |
974 | | if let Some(pub_sub_channel) = &self.pub_sub_channel { |
975 | | client |
976 | | .connection_manager |
977 | | .publish::<_, _, ()>(pub_sub_channel, final_key.as_ref()) |
978 | | .await?; |
979 | | } |
980 | | |
981 | | Ok(blob_len.try_into().unwrap_or(0)) |
982 | 9 | } |
983 | | |
984 | | async fn get_part( |
985 | | self: Pin<&Self>, |
986 | | key: StoreKey<'_>, |
987 | | writer: &mut DropCloserWriteHalf, |
988 | | offset: u64, |
989 | | length: Option<u64>, |
990 | 5 | ) -> Result<(), Error> { |
991 | | let offset = isize::try_from(offset).err_tip(|| "Could not convert offset to isize")?; |
992 | | let length = length |
993 | 4 | .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize")) |
994 | | .transpose()?; |
995 | | |
996 | | // To follow RBE spec we need to consider any digest's with |
997 | | // zero size to be existing. |
998 | | if is_zero_digest(key.borrow()) { |
999 | | return writer |
1000 | | .send_eof() |
1001 | | .err_tip(|| "Failed to send zero EOF in redis store get_part"); |
1002 | | } |
1003 | | |
1004 | | let encoded_key = self.encode_key(&key); |
1005 | | let encoded_key = encoded_key.as_ref(); |
1006 | | |
1007 | | // N.B. the `-1`'s you see here are because redis GETRANGE is inclusive at both the start and end, so when we |
1008 | | // do math with indices we change them to be exclusive at the end. |
1009 | | |
1010 | | // We want to read the data at the key from `offset` to `offset + length`. |
1011 | | let data_start = offset; |
1012 | | let data_end = data_start |
1013 | | .saturating_add(length.unwrap_or(isize::MAX as usize) as isize) |
1014 | | .saturating_sub(1); |
1015 | | |
1016 | | // And we don't ever want to read more than `read_chunk_size` bytes at a time, so we'll need to iterate. |
1017 | | let mut chunk_start = data_start; |
1018 | | let mut chunk_end = cmp::min( |
1019 | | data_start.saturating_add(self.read_chunk_size as isize) - 1, |
1020 | | data_end, |
1021 | | ); |
1022 | | |
1023 | | let mut client = self.get_client().await?; |
1024 | | loop { |
1025 | | let chunk: Bytes = client |
1026 | | .connection_manager |
1027 | | .getrange(encoded_key, chunk_start, chunk_end) |
1028 | | .await |
1029 | | .err_tip(|| "In RedisStore::get_part::getrange")?; |
1030 | | |
1031 | | let didnt_receive_full_chunk = chunk.len() < self.read_chunk_size; |
1032 | | let reached_end_of_data = chunk_end == data_end; |
1033 | | |
1034 | | if didnt_receive_full_chunk || reached_end_of_data { |
1035 | | if !chunk.is_empty() { |
1036 | | writer |
1037 | | .send(chunk) |
1038 | | .await |
1039 | | .err_tip(|| "Failed to write data in RedisStore::get_part")?; |
1040 | | } |
1041 | | |
1042 | | break; // No more data to read. |
1043 | | } |
1044 | | |
1045 | | // We received a full chunk's worth of data, so write it... |
1046 | | writer |
1047 | | .send(chunk) |
1048 | | .await |
1049 | | .err_tip(|| "Failed to write data in RedisStore::get_part")?; |
1050 | | |
1051 | | // ...and go grab the next chunk. |
1052 | | chunk_start = chunk_end + 1; |
1053 | | chunk_end = cmp::min( |
1054 | | chunk_start.saturating_add(self.read_chunk_size as isize) - 1, |
1055 | | data_end, |
1056 | | ); |
1057 | | } |
1058 | | |
1059 | | // If we didn't write any data, check if the key exists, if not return a NotFound error. |
1060 | | // This is required by spec. |
1061 | | if writer.get_bytes_written() == 0 { |
1062 | | // We're supposed to read 0 bytes, so just check if the key exists. |
1063 | | let exists: bool = client |
1064 | | .connection_manager |
1065 | | .exists(encoded_key) |
1066 | | .await |
1067 | | .err_tip(|| "In RedisStore::get_part::zero_exists")?; |
1068 | | |
1069 | | if !exists { |
1070 | | return Err(make_err!( |
1071 | | Code::NotFound, |
1072 | | "Data not found in Redis store for digest: {key:?}" |
1073 | | )); |
1074 | | } |
1075 | | } |
1076 | | |
1077 | | writer |
1078 | | .send_eof() |
1079 | | .err_tip(|| "Failed to write EOF in redis store get_part") |
1080 | 5 | } |
1081 | | |
1082 | 0 | fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { |
1083 | 0 | self |
1084 | 0 | } |
1085 | | |
1086 | 0 | fn as_any(&self) -> &(dyn core::any::Any + Sync + Send) { |
1087 | 0 | self |
1088 | 0 | } |
1089 | | |
1090 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send> { |
1091 | 0 | self |
1092 | 0 | } |
1093 | | |
1094 | 0 | fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) { |
1095 | 0 | registry.register_indicator(self); |
1096 | 0 | } |
1097 | | |
1098 | 0 | fn register_remove_callback( |
1099 | 0 | self: Arc<Self>, |
1100 | 0 | _callback: Arc<dyn RemoveItemCallback>, |
1101 | 0 | ) -> Result<(), Error> { |
1102 | | // As redis doesn't drop stuff, we can just ignore this |
1103 | 0 | Ok(()) |
1104 | 0 | } |
1105 | | } |
1106 | | |
1107 | | #[async_trait] |
1108 | | impl<C, M> HealthStatusIndicator for RedisStore<C, M> |
1109 | | where |
1110 | | C: ConnectionLike + Clone + Send + Sync + Unpin + 'static, |
1111 | | M: RedisManager<C> + Send + Sync + Unpin + 'static, |
1112 | | { |
1113 | 1 | fn get_name(&self) -> &'static str { |
1114 | 1 | "RedisStore" |
1115 | 1 | } |
1116 | | |
1117 | | /// Lightweight health check: just `PING` the master, bounded by a |
1118 | | /// short physical timeout. The default `StoreDriver::check_health` |
1119 | | /// performs a full `update_oneshot` + `has` + `get_part_unchunked` |
1120 | | /// roundtrip, which queues behind real production traffic on the |
1121 | | /// same connection-permit semaphore and Redis master. When the |
1122 | | /// store is even moderately loaded that easily exceeds the |
1123 | | /// `HealthServer` per-indicator budget (default 5 s), each |
1124 | | /// RedisStore-backed indicator (AC, small-blob CAS, scheduler) |
1125 | | /// reports `HealthStatus::Timeout`, and `/status` returns 503 — |
1126 | | /// surfaced as a readiness-probe failure that sheds traffic from |
1127 | | /// an otherwise-functional pod. A `PING` proves the connection |
1128 | | /// is reachable and the master is accepting commands; that is |
1129 | | /// the only invariant a kubelet probe needs. |
1130 | 0 | async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus { |
1131 | | let mut client = match self.get_client().await { |
1132 | | Ok(c) => c, |
1133 | | Err(e) => { |
1134 | | return HealthStatus::new_failed( |
1135 | | self, |
1136 | | format!("RedisStore::check_health: failed to acquire connection: {e}").into(), |
1137 | | ); |
1138 | | } |
1139 | | }; |
1140 | | |
1141 | | // Hold the `ClientWithPermit` for the duration of the call so |
1142 | | // its `Drop` releases the semaphore permit on exit. We just |
1143 | | // need a `&mut` to the connection manager underneath. |
1144 | 0 | let ping = async { |
1145 | 0 | redis::cmd("PING") |
1146 | 0 | .query_async::<()>(&mut client.connection_manager) |
1147 | 0 | .await |
1148 | 0 | }; |
1149 | | match timeout(self.health_check_timeout, ping).await { |
1150 | | Ok(Ok(())) => HealthStatus::new_ok(self, "RedisStore::check_health: PING ok".into()), |
1151 | | Ok(Err(e)) => HealthStatus::new_failed( |
1152 | | self, |
1153 | | format!("RedisStore::check_health: PING errored: {e}").into(), |
1154 | | ), |
1155 | | Err(_) => HealthStatus::new_failed( |
1156 | | self, |
1157 | | format!( |
1158 | | "RedisStore::check_health: PING exceeded {}ms timeout", |
1159 | | self.health_check_timeout.as_millis() |
1160 | | ) |
1161 | | .into(), |
1162 | | ), |
1163 | | } |
1164 | 0 | } |
1165 | | } |
1166 | | |
1167 | | // ------------------------------------------------------------------- |
1168 | | // Below this line are specific to the redis scheduler implementation. |
1169 | | // ------------------------------------------------------------------- |
1170 | | |
1171 | | /// The time in milliseconds that a redis cursor can be idle before it is closed. |
1172 | | const CURSOR_IDLE_MS: u64 = 30_000; |
1173 | | /// The name of the field in the Redis hash that stores the data. |
1174 | | const DATA_FIELD_NAME: &str = "data"; |
1175 | | /// The name of the field in the Redis hash that stores the version. |
1176 | | const VERSION_FIELD_NAME: &str = "version"; |
1177 | | /// The time to live of indexes in seconds. After this time redis may delete the index. |
1178 | | const INDEX_TTL_S: u64 = 60 * 60 * 24; // 24 hours. |
1179 | | |
1180 | | #[allow(rustdoc::broken_intra_doc_links)] |
1181 | | /// Lua script to set a key if the version matches. |
1182 | | /// Args: |
1183 | | /// KEYS[1]: The key where the version is stored. |
1184 | | /// ARGV[1]: The expected version. |
1185 | | /// ARGV[2]: TTL in seconds, or 0 for forever |
1186 | | /// ARGV[3]: The new data. |
1187 | | /// ARGV[4*]: Key-value pairs of additional data to include. |
1188 | | /// Returns: |
1189 | | /// The new version if the version matches. nil is returned if the |
1190 | | /// value was not set. |
1191 | | pub const LUA_VERSION_SET_SCRIPT: &str = formatcp!( |
1192 | | r" |
1193 | | local key = KEYS[1] |
1194 | | local expected_version = tonumber(ARGV[1]) |
1195 | | local ttl = tonumber(ARGV[2]) |
1196 | | local new_data = ARGV[3] |
1197 | | local new_version = redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', 1) |
1198 | | local i |
1199 | | local indexes = {{}} |
1200 | | |
1201 | | if new_version-1 ~= expected_version then |
1202 | | redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', -1) |
1203 | | return {{ 0, new_version-1 }} |
1204 | | end |
1205 | | -- Skip first 3 argvs, as they are known inputs. |
1206 | | -- Remember: Lua is 1-indexed. |
1207 | | for i=4, #ARGV do |
1208 | | indexes[i-3] = ARGV[i] |
1209 | | end |
1210 | | |
1211 | | -- In testing we witnessed redis sometimes not update our FT indexes |
1212 | | -- resulting in stale data. It appears if we delete our keys then insert |
1213 | | -- them again it works and reduces risk significantly. |
1214 | | redis.call('DEL', key) |
1215 | | redis.call('HSET', key, '{DATA_FIELD_NAME}', new_data, '{VERSION_FIELD_NAME}', new_version, unpack(indexes)) |
1216 | | |
1217 | | if ttl ~= 0 then |
1218 | | redis.call('EXPIRE', key, ttl) |
1219 | | end |
1220 | | return {{ 1, new_version }} |
1221 | | " |
1222 | | ); |
1223 | | |
1224 | | /// This is the output of the calculations below hardcoded into the executable. |
1225 | | const FINGERPRINT_CREATE_INDEX_HEX: &str = "3e762c15"; |
1226 | | |
1227 | | #[cfg(test)] |
1228 | | mod test { |
1229 | | use super::FINGERPRINT_CREATE_INDEX_HEX; |
1230 | | |
1231 | | /// String of the `FT.CREATE` command used to create the index template. |
1232 | | const CREATE_INDEX_TEMPLATE: &str = "FT.CREATE {} ON HASH PREFIX 1 {} NOOFFSETS NOHL NOFIELDS NOFREQS SCHEMA {} TAG CASESENSITIVE SORTABLE"; |
1233 | | |
1234 | | /// Compile-time fingerprint of the `FT.CREATE` command used to create the |
1235 | | /// index template. This is a simple CRC32 checksum of the command string. |
1236 | | /// We don't care about it actually being a valid CRC32 checksum, just that |
1237 | | /// it's a unique identifier with a low chance of collision. |
1238 | 1 | const fn fingerprint_create_index_template() -> u32 { |
1239 | | const POLY: u32 = 0xEDB8_8320; |
1240 | | const DATA: &[u8] = CREATE_INDEX_TEMPLATE.as_bytes(); |
1241 | 1 | let mut crc = 0xFFFF_FFFF; |
1242 | 1 | let mut i = 0; |
1243 | 102 | while i < DATA.len() { |
1244 | 101 | let byte = DATA[i]; |
1245 | 101 | crc ^= byte as u32; |
1246 | | |
1247 | 101 | let mut j = 0; |
1248 | 909 | while j < 8 { |
1249 | 808 | crc = if crc & 1 != 0 { |
1250 | 386 | (crc >> 1) ^ POLY |
1251 | | } else { |
1252 | 422 | crc >> 1 |
1253 | | }; |
1254 | 808 | j += 1; |
1255 | | } |
1256 | 101 | i += 1; |
1257 | | } |
1258 | 1 | crc |
1259 | 1 | } |
1260 | | |
1261 | | /// Verify that our calculation always evaluates to this fixed value. |
1262 | | #[test] |
1263 | 1 | fn test_fingerprint_value() { |
1264 | 1 | assert_eq!( |
1265 | 1 | format!("{:08x}", &fingerprint_create_index_template()), |
1266 | | FINGERPRINT_CREATE_INDEX_HEX, |
1267 | | ); |
1268 | 1 | } |
1269 | | } |
1270 | | |
1271 | | /// Get the name of the index to create for the given field. |
1272 | | /// This will add some prefix data to the name to try and ensure |
1273 | | /// if the index definition changes, the name will get a new name. |
1274 | | macro_rules! get_index_name { |
1275 | | ($prefix:expr, $field:expr, $maybe_sort:expr) => { |
1276 | | format_args!( |
1277 | | "{}_{}_{}_{}", |
1278 | | $prefix, |
1279 | | $field, |
1280 | | $maybe_sort.unwrap_or(""), |
1281 | | FINGERPRINT_CREATE_INDEX_HEX |
1282 | | ) |
1283 | | }; |
1284 | | } |
1285 | | |
1286 | | /// Try to sanitize a string to be used as a Redis key. |
1287 | | /// We don't actually modify the string, just check if it's valid. |
1288 | 28 | const fn try_sanitize(s: &str) -> bool { |
1289 | | // Note: We cannot use for loops or iterators here because they are not const. |
1290 | | // Allowing us to use a const function here gives the compiler the ability to |
1291 | | // optimize this function away entirely in the case where the input is constant. |
1292 | 28 | let chars = s.as_bytes(); |
1293 | 28 | let mut i: usize = 0; |
1294 | 28 | let len = s.len(); |
1295 | | loop { |
1296 | 546 | if i >= len { |
1297 | 28 | break; |
1298 | 518 | } |
1299 | 518 | let c = chars[i]; |
1300 | 518 | if !c.is_ascii_alphanumeric() && c != b'_'20 { |
1301 | 0 | return false; |
1302 | 518 | } |
1303 | 518 | i += 1; |
1304 | | } |
1305 | 28 | true |
1306 | 28 | } |
1307 | | |
1308 | | /// An individual subscription to a key in Redis. |
1309 | | #[derive(Debug)] |
1310 | | pub struct RedisSubscription { |
1311 | | receiver: Option<tokio::sync::watch::Receiver<String>>, |
1312 | | weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>, |
1313 | | } |
1314 | | |
1315 | | impl SchedulerSubscription for RedisSubscription { |
1316 | | /// Wait for the subscription key to change. |
1317 | 20 | async fn changed(&mut self) -> Result<(), Error> { |
1318 | 20 | let receiver = self |
1319 | 20 | .receiver |
1320 | 20 | .as_mut() |
1321 | 20 | .ok_or_else(|| make_err!0 (Code::Internal0 , "In RedisSubscription::changed::as_mut"))?0 ; |
1322 | 20 | receiver.changed().await.map_err17 (|err| {0 |
1323 | 0 | Error::from_std_err(Code::Internal, &err) |
1324 | 0 | .append("In RedisSubscription::changed::changed") |
1325 | 0 | }) |
1326 | 17 | } |
1327 | | } |
1328 | | |
1329 | | // If the subscription is dropped, we need to possibly remove the key from the |
1330 | | // subscribed keys map. |
1331 | | impl Drop for RedisSubscription { |
1332 | 410 | fn drop(&mut self) { |
1333 | 410 | let Some(receiver) = self.receiver.take() else { |
1334 | 0 | warn!("RedisSubscription has already been dropped, nothing to do."); |
1335 | 0 | return; // Already dropped, nothing to do. |
1336 | | }; |
1337 | 410 | let key = receiver.borrow().clone(); |
1338 | 410 | let Some(subscribed_keys408 ) = self.weak_subscribed_keys.upgrade() else { |
1339 | 2 | return; // Parent dropped — nothing to do. |
1340 | | }; |
1341 | 408 | let mut subscribed_keys = subscribed_keys.write(); |
1342 | 408 | let Some(publisher) = subscribed_keys.get(&key) else { |
1343 | 0 | warn!( |
1344 | | %key, |
1345 | | "RedisSubscription::drop: key absent from subscribed_keys under write lock — \ |
1346 | | indicates an unexpected removal path", |
1347 | | ); |
1348 | 0 | return; |
1349 | | }; |
1350 | | // Count includes our own (still-alive) receiver. If we are the |
1351 | | // sole subscriber, remove the publisher entry. |
1352 | 408 | if publisher.receiver_count() == 1 { |
1353 | 206 | subscribed_keys.remove(&key); |
1354 | 206 | }202 |
1355 | 408 | drop(receiver); |
1356 | 410 | } |
1357 | | } |
1358 | | |
1359 | | /// A publisher for a key in Redis. |
1360 | | #[derive(Debug)] |
1361 | | struct RedisSubscriptionPublisher { |
1362 | | sender: Mutex<tokio::sync::watch::Sender<String>>, |
1363 | | } |
1364 | | |
1365 | | impl RedisSubscriptionPublisher { |
1366 | 208 | fn new( |
1367 | 208 | key: String, |
1368 | 208 | weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>, |
1369 | 208 | ) -> (Self, RedisSubscription) { |
1370 | 208 | let (sender, receiver) = tokio::sync::watch::channel(key); |
1371 | 208 | let publisher = Self { |
1372 | 208 | sender: Mutex::new(sender), |
1373 | 208 | }; |
1374 | 208 | let subscription = RedisSubscription { |
1375 | 208 | receiver: Some(receiver), |
1376 | 208 | weak_subscribed_keys, |
1377 | 208 | }; |
1378 | 208 | (publisher, subscription) |
1379 | 208 | } |
1380 | | |
1381 | 202 | fn subscribe( |
1382 | 202 | &self, |
1383 | 202 | weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>, |
1384 | 202 | ) -> RedisSubscription { |
1385 | 202 | let receiver = self.sender.lock().subscribe(); |
1386 | 202 | RedisSubscription { |
1387 | 202 | receiver: Some(receiver), |
1388 | 202 | weak_subscribed_keys, |
1389 | 202 | } |
1390 | 202 | } |
1391 | | |
1392 | 408 | fn receiver_count(&self) -> usize { |
1393 | 408 | self.sender.lock().receiver_count() |
1394 | 408 | } |
1395 | | |
1396 | 18 | fn notify(&self) { |
1397 | | // TODO(https://github.com/sile/patricia_tree/issues/40) When this is addressed |
1398 | | // we can remove the `Mutex` and use the mutable iterator directly. |
1399 | 18 | self.sender.lock().send_modify(|_| {}); |
1400 | 18 | } |
1401 | | } |
1402 | | |
1403 | | #[derive(Debug, Clone)] |
1404 | | pub struct RedisSubscriptionManager { |
1405 | | subscribed_keys: Arc<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>, |
1406 | | tx_for_test: UnboundedSender<String>, |
1407 | | _subscription_spawn: Arc<Mutex<JoinHandleDropGuard<()>>>, |
1408 | | } |
1409 | | |
1410 | | impl RedisSubscriptionManager { |
1411 | 9 | pub fn new(subscriber_channel: UnboundedReceiver<PushInfo>) -> Self { |
1412 | 9 | let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new())); |
1413 | 9 | let subscribed_keys_weak = Arc::downgrade(&subscribed_keys); |
1414 | 9 | let (tx_for_test, mut rx_for_test) = unbounded_channel(); |
1415 | 9 | let mut local_subscriber_channel = UnboundedReceiverStream::new(subscriber_channel); |
1416 | | Self { |
1417 | 9 | subscribed_keys, |
1418 | 9 | tx_for_test, |
1419 | 9 | _subscription_spawn: Arc::new(Mutex::new(spawn!( |
1420 | | "redis_subscribe_spawn", |
1421 | 7 | async move { |
1422 | | loop { |
1423 | | loop { |
1424 | 23 | let key15 = select! { |
1425 | 23 | value14 = rx_for_test.recv() => { |
1426 | 14 | let Some(value) = value else { |
1427 | 0 | unreachable!("Channel should never close"); |
1428 | | }; |
1429 | 14 | value |
1430 | | }, |
1431 | 23 | maybe_push_info2 = local_subscriber_channel.next() => { |
1432 | 2 | if let Some(push_info) = maybe_push_info { |
1433 | 2 | match push_info.kind { |
1434 | 1 | redis::PushKind::PMessage => {}, |
1435 | | redis::PushKind::PSubscribe => { |
1436 | 1 | trace!(?push_info, "PSubscribe, ignore"); |
1437 | 1 | continue; |
1438 | | } |
1439 | | _ => { |
1440 | 0 | warn!(?push_info, "Other push_info message, discarded"); |
1441 | 0 | continue; |
1442 | | }, |
1443 | | } |
1444 | 1 | if push_info.data.len() != 3 { |
1445 | 0 | error!(?push_info, "Expected exactly 3 values on subscriber channel (pattern, channel, value)"); |
1446 | 0 | continue; |
1447 | 1 | } |
1448 | 1 | match push_info.data.last().unwrap() { |
1449 | 0 | Value::SimpleString(s) => { |
1450 | 0 | s.clone() |
1451 | | } |
1452 | 1 | Value::BulkString(v) => { |
1453 | 1 | String::from_utf8(v.clone()).expect("String message") |
1454 | | } |
1455 | 0 | other => { |
1456 | 0 | error!(?other, "Received non-string message in RedisSubscriptionManager"); |
1457 | 0 | continue; |
1458 | | } |
1459 | | } |
1460 | | } else { |
1461 | 0 | error!("Error receiving message in RedisSubscriptionManager from subscriber_channel"); |
1462 | 0 | break; |
1463 | | } |
1464 | | } |
1465 | | }; |
1466 | 15 | trace!(key, "New subscription manager key"); |
1467 | 15 | let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else { |
1468 | 0 | warn!( |
1469 | | "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn" |
1470 | | ); |
1471 | 0 | return; |
1472 | | }; |
1473 | 15 | let subscribed_keys_mux = subscribed_keys.read(); |
1474 | 15 | subscribed_keys_mux |
1475 | 15 | .common_prefix_values(&*key) |
1476 | 15 | .for_each(RedisSubscriptionPublisher::notify); |
1477 | | } |
1478 | | // Sleep for a small amount of time to ensure we don't reconnect too quickly. |
1479 | 0 | sleep(Duration::from_secs(1)).await; |
1480 | | // If we reconnect or lag behind we might have had dirty keys, so we need to |
1481 | | // flag all of them as changed. |
1482 | 0 | let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else { |
1483 | 0 | warn!( |
1484 | | "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn" |
1485 | | ); |
1486 | 0 | return; |
1487 | | }; |
1488 | 0 | let subscribed_keys_mux = subscribed_keys.read(); |
1489 | | // Just in case also get a new receiver. |
1490 | 0 | for publisher in subscribed_keys_mux.values() { |
1491 | 0 | publisher.notify(); |
1492 | 0 | } |
1493 | | } |
1494 | 0 | } |
1495 | | ))), |
1496 | | } |
1497 | 9 | } |
1498 | | } |
1499 | | |
1500 | | impl SubscriptionManagerNotify for RedisSubscriptionManager { |
1501 | 16 | fn notify_for_test(&self, value: String) { |
1502 | 16 | self.tx_for_test.send(value).unwrap(); |
1503 | 16 | } |
1504 | | } |
1505 | | |
1506 | | impl SchedulerSubscriptionManager for RedisSubscriptionManager { |
1507 | | type Subscription = RedisSubscription; |
1508 | | |
1509 | 410 | fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error> |
1510 | 410 | where |
1511 | 410 | K: SchedulerStoreKeyProvider, |
1512 | | { |
1513 | 410 | let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys); |
1514 | 410 | let mut subscribed_keys = self.subscribed_keys.write(); |
1515 | 410 | let key = key.get_key(); |
1516 | 410 | let key_str = key.as_str(); |
1517 | 410 | let mut subscription = if let Some(publisher202 ) = subscribed_keys.get(&key_str) { |
1518 | 202 | publisher.subscribe(weak_subscribed_keys) |
1519 | | } else { |
1520 | 208 | let (publisher, subscription) = |
1521 | 208 | RedisSubscriptionPublisher::new(key_str.to_string(), weak_subscribed_keys); |
1522 | 208 | subscribed_keys.insert(key_str, publisher); |
1523 | 208 | subscription |
1524 | | }; |
1525 | 410 | subscription |
1526 | 410 | .receiver |
1527 | 410 | .as_mut() |
1528 | 410 | .ok_or_else(|| {0 |
1529 | 0 | make_err!( |
1530 | 0 | Code::Internal, |
1531 | | "Receiver should be set in RedisSubscriptionManager::subscribe" |
1532 | | ) |
1533 | 0 | })? |
1534 | 410 | .mark_changed(); |
1535 | | |
1536 | 410 | Ok(subscription) |
1537 | 410 | } |
1538 | | |
1539 | 1 | fn is_reliable() -> bool { |
1540 | 1 | false |
1541 | 1 | } |
1542 | | } |
1543 | | |
1544 | | impl<C, M> SchedulerStore for RedisStore<C, M> |
1545 | | where |
1546 | | C: Clone + ConnectionLike + Sync + Send + 'static, |
1547 | | M: RedisManager<C> + Sync + Send + 'static, |
1548 | | { |
1549 | | type SubscriptionManager = RedisSubscriptionManager; |
1550 | | |
1551 | 6 | async fn subscription_manager(&self) -> Result<Arc<RedisSubscriptionManager>, Error> { |
1552 | 6 | self.subscription_manager |
1553 | 6 | .get_or_try_init(|| async move {3 |
1554 | 3 | let Some(subscriber_channel) = self.subscriber_channel.lock().take() else { |
1555 | 0 | return Err(make_input_err!( |
1556 | 0 | "Multiple attempts to obtain the subscription manager in RedisStore" |
1557 | 0 | )); |
1558 | | }; |
1559 | 3 | let Some(pub_sub_channel) = &self.pub_sub_channel else { |
1560 | 0 | return Err(make_input_err!( |
1561 | 0 | "RedisStore must have a pubsub for Redis Scheduler if using subscriptions" |
1562 | 0 | )); |
1563 | | }; |
1564 | 3 | self.connection_manager.psubscribe(pub_sub_channel).await?0 ; |
1565 | 3 | Ok(Arc::new(RedisSubscriptionManager::new(subscriber_channel))) |
1566 | 6 | }) |
1567 | 6 | .await |
1568 | 6 | .map(Clone::clone) |
1569 | 6 | } |
1570 | | |
1571 | 24 | async fn update_data<T>(&self, data: T, expiry: Option<Duration>) -> Result<Option<i64>, Error> |
1572 | 24 | where |
1573 | 24 | T: SchedulerStoreDataProvider |
1574 | 24 | + SchedulerStoreKeyProvider |
1575 | 24 | + SchedulerCurrentVersionProvider |
1576 | 24 | + Send, |
1577 | 24 | { |
1578 | 24 | let key = data.get_key(); |
1579 | 24 | let redis_key = self.encode_key(&key); |
1580 | 24 | let mut client = self.get_client().await?0 ; |
1581 | 24 | let maybe_index = data.get_indexes().err_tip(|| {0 |
1582 | 0 | format!("Err getting index in RedisStore::update_data::versioned for {redis_key}") |
1583 | 0 | })?; |
1584 | 24 | if <T as SchedulerStoreKeyProvider>::Versioned::VALUE { |
1585 | 18 | let current_version = data.current_version(); |
1586 | 18 | let data = data.try_into_bytes().err_tip(|| {0 |
1587 | 0 | format!("Could not convert value to bytes in RedisStore::update_data::versioned for {redis_key}") |
1588 | 0 | })?; |
1589 | 18 | let mut script = self.connection_manager.update_script(redis_key.as_ref()); |
1590 | 18 | let mut script_invocation = script |
1591 | 18 | .arg(format!("{current_version}")) |
1592 | 18 | .arg(expiry.unwrap_or(Duration::ZERO).as_secs()) |
1593 | 18 | .arg(data.to_vec()); |
1594 | 52 | for (name, value) in maybe_index18 { |
1595 | 52 | script_invocation = script_invocation.arg(name).arg(value.to_vec()); |
1596 | 52 | } |
1597 | 18 | let start = Instant::now(); |
1598 | 18 | let (success, new_version): (bool, i64) = match script_invocation |
1599 | 18 | .invoke_async(&mut client.connection_manager) |
1600 | 18 | .await |
1601 | | { |
1602 | 18 | Ok(v) => v, |
1603 | 0 | Err(err) |
1604 | 0 | if err.kind() == redis::ErrorKind::Server(redis::ServerErrorKind::ReadOnly) => |
1605 | | { |
1606 | 0 | client.reconnect(&self.connection_manager).await?; |
1607 | 0 | script_invocation |
1608 | 0 | .invoke_async(&mut client.connection_manager) |
1609 | 0 | .await |
1610 | 0 | .err_tip(|| format!("(after reconnect) In RedisStore::update_data::versioned for {key:?}"))? |
1611 | | } |
1612 | 0 | Err(err) => { |
1613 | 0 | let mut error: Error = err.into(); |
1614 | 0 | error |
1615 | 0 | .messages |
1616 | 0 | .push(format!("In RedisStore::update_data::versioned for {key:?}")); |
1617 | 0 | return Err(error); |
1618 | | } |
1619 | | }; |
1620 | | |
1621 | 18 | let elapsed = start.elapsed(); |
1622 | | |
1623 | 18 | if elapsed > Duration::from_millis(100) { |
1624 | 0 | warn!( |
1625 | | %redis_key, |
1626 | | ?elapsed, |
1627 | | "Slow Redis version-set operation" |
1628 | | ); |
1629 | 18 | } |
1630 | 18 | if !success { |
1631 | 4 | warn!( |
1632 | | %redis_key, |
1633 | | %key, |
1634 | | %current_version, |
1635 | | %new_version, |
1636 | 4 | caller = core::any::type_name::<T>(), |
1637 | | "Redis version conflict - optimistic lock failed" |
1638 | | ); |
1639 | 4 | return Ok(None); |
1640 | 14 | } |
1641 | 14 | trace!( |
1642 | | %redis_key, |
1643 | | %key, |
1644 | | old_version = %current_version, |
1645 | | %new_version, |
1646 | | "Updated redis key to new version" |
1647 | | ); |
1648 | | // If we have a publish channel configured, send a notice that the key has been set. |
1649 | 14 | if let Some(pub_sub_channel12 ) = &self.pub_sub_channel { |
1650 | 12 | return Ok(client |
1651 | 12 | .connection_manager |
1652 | 12 | .publish(pub_sub_channel, redis_key.as_ref()) |
1653 | 12 | .await?0 ); |
1654 | 2 | } |
1655 | 2 | Ok(Some(new_version)) |
1656 | | } else { |
1657 | 6 | let data = data.try_into_bytes().err_tip(|| {0 |
1658 | 0 | format!("Could not convert value to bytes in RedisStore::update_data::noversion for {redis_key}") |
1659 | 0 | })?; |
1660 | 6 | let mut fields: Vec<(String, _)> = vec![]; |
1661 | 6 | fields.push((DATA_FIELD_NAME.into(), data.to_vec())); |
1662 | 6 | for (name, value) in maybe_index { |
1663 | 6 | fields.push((name.into(), value.to_vec())); |
1664 | 6 | } |
1665 | 6 | match client |
1666 | 6 | .connection_manager |
1667 | 6 | .hset_multiple::<_, _, _, ()>(redis_key.as_ref(), &fields) |
1668 | 6 | .await |
1669 | | { |
1670 | 5 | Ok(_v) => { |
1671 | 5 | if let Some(expiry_v2 ) = expiry { |
1672 | 2 | let seconds = |
1673 | 2 | TryInto::<i64>::try_into(expiry_v.as_secs()).err_tip(|| {0 |
1674 | 0 | format!("Expiry seconds doesn't map to i64: {expiry_v:#?}") |
1675 | 0 | })?; |
1676 | 2 | let expiry_result: u8 = client |
1677 | 2 | .connection_manager |
1678 | 2 | .expire(redis_key.as_ref(), seconds) |
1679 | 2 | .await |
1680 | 2 | .err_tip(|| {0 |
1681 | 0 | format!( |
1682 | | "In RedisStore::update_data::noversion (expiry) for {redis_key}" |
1683 | | ) |
1684 | 0 | })?; |
1685 | 2 | if expiry_result != 1 { |
1686 | 1 | warn!(%redis_key, seconds, "Wasn't able to set expiry for Redis key"); |
1687 | 1 | } |
1688 | 3 | } |
1689 | | } |
1690 | 1 | Err(err) |
1691 | 1 | if err.kind() == redis::ErrorKind::Server(redis::ServerErrorKind::ReadOnly) => |
1692 | | { |
1693 | 1 | client.reconnect(&self.connection_manager).await?0 ; |
1694 | 1 | client |
1695 | 1 | .connection_manager |
1696 | 1 | .hset_multiple::<_, _, _, ()>(redis_key.as_ref(), &fields) |
1697 | 1 | .await |
1698 | 1 | .err_tip(|| format!0 ("(after reconnect) In RedisStore::update_data::noversion (hset) for {redis_key}"))?0 ; |
1699 | 1 | if let Some(expiry_v) = expiry { |
1700 | 1 | let seconds = |
1701 | 1 | TryInto::<i64>::try_into(expiry_v.as_secs()).err_tip(|| {0 |
1702 | 0 | format!("Expiry seconds doesn't map to i64: {expiry_v:#?}") |
1703 | 0 | })?; |
1704 | 1 | let expiry_result: u8 = client.connection_manager.expire(redis_key.as_ref(), seconds).await |
1705 | 1 | .err_tip(|| format!0 ("(after reconnect) In RedisStore::update_data::noversion (expiry) for {redis_key}"))?0 ; |
1706 | 1 | if expiry_result != 1 { |
1707 | 0 | warn!(%redis_key, seconds, "Wasn't able to set expiry for Redis key"); |
1708 | 1 | } |
1709 | 0 | } |
1710 | | } |
1711 | 0 | Err(err) => { |
1712 | 0 | let mut error: Error = err.into(); |
1713 | 0 | error.messages.push(format!( |
1714 | | "In RedisStore::update_data::noversion for {redis_key}" |
1715 | | )); |
1716 | 0 | return Err(error); |
1717 | | } |
1718 | | } |
1719 | | // If we have a publish channel configured, send a notice that the key has been set. |
1720 | 6 | if let Some(pub_sub_channel3 ) = &self.pub_sub_channel { |
1721 | 3 | return Ok(client |
1722 | 3 | .connection_manager |
1723 | 3 | .publish(pub_sub_channel, redis_key.as_ref()) |
1724 | 3 | .await?0 ); |
1725 | 3 | } |
1726 | 3 | Ok(Some(0)) // Always use "0" version since this is not a versioned request. |
1727 | | } |
1728 | 24 | } |
1729 | | |
1730 | 28 | async fn search_by_index_prefix<K>( |
1731 | 28 | &self, |
1732 | 28 | index: K, |
1733 | 28 | ) -> Result< |
1734 | 28 | impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send, |
1735 | 28 | Error, |
1736 | 28 | > |
1737 | 28 | where |
1738 | 28 | K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send, |
1739 | 28 | { |
1740 | 28 | let index_value = index.index_value(); |
1741 | 28 | try_sanitize(index_value.as_ref()) |
1742 | 28 | .then_some(()) |
1743 | 28 | .err_tip(|| {0 |
1744 | 0 | format!("In RedisStore::search_by_index_prefix::try_sanitize - {index_value:?}") |
1745 | 0 | })?; |
1746 | 31 | let run_ft_aggregate28 = |connection_manager: C| async { |
1747 | 31 | ft_aggregate( |
1748 | 31 | connection_manager, |
1749 | 31 | format!( |
1750 | | "{}", |
1751 | 31 | get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY) |
1752 | | ), |
1753 | 31 | if index_value.is_empty() { |
1754 | 2 | "*".to_string() |
1755 | | } else { |
1756 | 29 | format!("@{}:{{ {} }}", K::INDEX_NAME, index_value) |
1757 | | }, |
1758 | | FtAggregateOptions { |
1759 | 31 | load: vec![DATA_FIELD_NAME.into(), VERSION_FIELD_NAME.into()], |
1760 | 31 | cursor: FtAggregateCursor { |
1761 | 31 | count: self.max_count_per_cursor, |
1762 | 31 | max_idle: CURSOR_IDLE_MS, |
1763 | 31 | }, |
1764 | 31 | sort_by: K::MAYBE_SORT_KEY.map_or_else(Vec::new, |v| vec!27 [format!27 ("@{v}")]), |
1765 | | }, |
1766 | | ) |
1767 | 31 | .await |
1768 | 62 | }; |
1769 | 28 | let run_ft_create = |connection_manager: C| async {3 |
1770 | 3 | let mut schema = vec![SearchSchema { |
1771 | 3 | field_name: K::INDEX_NAME.into(), |
1772 | 3 | sortable: false, |
1773 | 3 | }]; |
1774 | 3 | if let Some(sort_key) = K::MAYBE_SORT_KEY { |
1775 | 3 | schema.push(SearchSchema { |
1776 | 3 | field_name: sort_key.into(), |
1777 | 3 | sortable: true, |
1778 | 3 | }); |
1779 | 3 | }0 |
1780 | 3 | let create_options = FtCreateOptions { |
1781 | 3 | prefixes: vec![K::KEY_PREFIX.into()], |
1782 | 3 | nohl: true, |
1783 | 3 | nofields: true, |
1784 | 3 | nofreqs: true, |
1785 | 3 | nooffsets: true, |
1786 | 3 | temporary: Some(INDEX_TTL_S), |
1787 | 3 | }; |
1788 | 3 | let index = format!( |
1789 | | "{}", |
1790 | 3 | get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY) |
1791 | | ); |
1792 | 3 | ft_create(connection_manager, index, create_options, schema).await |
1793 | 6 | }; |
1794 | | |
1795 | 28 | let (connection_manager, connect_id) = self.connection_manager.get_connection().await?0 ; |
1796 | 28 | let stream25 = match run_ft_aggregate(connection_manager.clone()).await { |
1797 | 0 | Err(err) |
1798 | 3 | if err.kind() == redis::ErrorKind::Server(redis::ServerErrorKind::ReadOnly)0 => |
1799 | | { |
1800 | 0 | let (connection_manager, _connect_id) = |
1801 | 0 | self.connection_manager.reconnect(connect_id).await?; |
1802 | 0 | run_ft_aggregate(connection_manager).await.err_tip(|| { |
1803 | 0 | format!( |
1804 | | "Error with reconnected ft_aggregate in RedisStore::search_by_index_prefix({})", |
1805 | 0 | get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY), |
1806 | | ) |
1807 | 0 | }) |
1808 | | } |
1809 | | Err(_) => { |
1810 | 3 | let (connection_manager, result) = |
1811 | 3 | match run_ft_create(connection_manager.clone()).await { |
1812 | 0 | Err(err) |
1813 | 3 | if err.kind() |
1814 | 0 | == redis::ErrorKind::Server(redis::ServerErrorKind::ReadOnly) => |
1815 | | { |
1816 | 0 | let (connection_manager, _connect_id) = |
1817 | 0 | self.connection_manager.reconnect(connect_id).await?; |
1818 | | ( |
1819 | 0 | connection_manager.clone(), |
1820 | 0 | run_ft_create(connection_manager).await, |
1821 | | ) |
1822 | | } |
1823 | 3 | result => (connection_manager, result), |
1824 | | }; |
1825 | | |
1826 | | // RediSearch returns ErrorKind::Extension with code "Index" |
1827 | | // and detail along the lines of "Index already exists" when |
1828 | | // FT.CREATE races with another node. |
1829 | 3 | let create_result = result.or_else(|e| { |
1830 | 3 | let is_already_exists = e.kind() == redis::ErrorKind::Extension |
1831 | 2 | && e.code() == Some("Index") |
1832 | 1 | && e.detail() |
1833 | 1 | .is_some_and(|d| d.to_ascii_lowercase().contains("already exists")); |
1834 | 3 | if is_already_exists { |
1835 | 1 | Ok(()) |
1836 | | } else { |
1837 | 2 | Err(e).err_tip(|| { |
1838 | 2 | format!( |
1839 | | "Error with ft_create in RedisStore::search_by_index_prefix({})", |
1840 | 2 | get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY), |
1841 | | ) |
1842 | 2 | }) |
1843 | | } |
1844 | 3 | }); |
1845 | | |
1846 | 3 | let run_result = run_ft_aggregate(connection_manager).await.err_tip(|| { |
1847 | 3 | format!( |
1848 | | "Error with second ft_aggregate in RedisStore::search_by_index_prefix({})", |
1849 | 3 | get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY), |
1850 | | ) |
1851 | 3 | }); |
1852 | | |
1853 | | // Creating the index will race which is ok. If it fails to create, we only |
1854 | | // error if the second ft_aggregate call fails and fails to create. |
1855 | 3 | run_result.or_else(move |e| create_result.merge(Err(e))) |
1856 | | } |
1857 | 25 | Ok(stream) => Ok(stream), |
1858 | 3 | }?; |
1859 | | |
1860 | 25 | Ok(stream.filter_map(|result| async move {16 |
1861 | 16 | let raw_redis_map = match result { |
1862 | 16 | Ok(v) => v, |
1863 | 0 | Err(e) => { |
1864 | | return Some( |
1865 | 0 | Err(Error::from(e)) |
1866 | 0 | .err_tip(|| "Error in stream of in RedisStore::search_by_index_prefix"), |
1867 | | ); |
1868 | | } |
1869 | | }; |
1870 | | |
1871 | 16 | if matches!15 (raw_redis_map, Value::Int(_)) { |
1872 | 1 | return None; |
1873 | 15 | } |
1874 | | |
1875 | 15 | let Some(redis_map) = raw_redis_map.as_sequence() else { |
1876 | 0 | return Some(Err(Error::new( |
1877 | 0 | Code::Internal, |
1878 | 0 | format!("Non-array from ft_aggregate: {raw_redis_map:?}"), |
1879 | 0 | ))); |
1880 | | }; |
1881 | 15 | let mut redis_map_iter = redis_map.iter(); |
1882 | 15 | let mut bytes_data: Option<Bytes> = None; |
1883 | 15 | let mut version: Option<i64> = None; |
1884 | 46 | while let Some(key31 ) = redis_map_iter.next() { |
1885 | 31 | let value = redis_map_iter.next().unwrap(); |
1886 | 31 | let Value::BulkString(k) = key else { |
1887 | 0 | return Some(Err(Error::new( |
1888 | 0 | Code::Internal, |
1889 | 0 | format!("Non-BulkString key from ft_aggregate: {key:?}"), |
1890 | 0 | ))); |
1891 | | }; |
1892 | 31 | let Ok(str_key) = str::from_utf8(k) else { |
1893 | 0 | return Some(Err(Error::new( |
1894 | 0 | Code::Internal, |
1895 | 0 | format!("Non-utf8 key from ft_aggregate: {key:?}"), |
1896 | 0 | ))); |
1897 | | }; |
1898 | 31 | let Value::BulkString(v) = value else { |
1899 | 0 | return Some(Err(Error::new( |
1900 | 0 | Code::Internal, |
1901 | 0 | format!("Non-BulkString value from ft_aggregate: {key:?}"), |
1902 | 0 | ))); |
1903 | | }; |
1904 | 31 | match str_key { |
1905 | 31 | DATA_FIELD_NAME => { |
1906 | 15 | bytes_data = Some(v.clone().into()); |
1907 | 15 | } |
1908 | 16 | VERSION_FIELD_NAME => { |
1909 | 15 | let Ok(str_v) = str::from_utf8(v) else { |
1910 | 0 | return Some(Err(Error::new( |
1911 | 0 | Code::Internal, |
1912 | 0 | format!("Non-utf8 version value from ft_aggregate: {v:?}"), |
1913 | 0 | ))); |
1914 | | }; |
1915 | 15 | let Ok(raw_version) = str_v.parse::<i64>() else { |
1916 | 0 | return Some(Err(Error::new( |
1917 | 0 | Code::Internal, |
1918 | 0 | format!("Non-integer version value from ft_aggregate: {str_v:?}"), |
1919 | 0 | ))); |
1920 | | }; |
1921 | 15 | version = Some(raw_version); |
1922 | | } |
1923 | 1 | other => { |
1924 | 1 | if K::MAYBE_SORT_KEY == Some(other) { |
1925 | 1 | // ignore sort keys |
1926 | 1 | } else { |
1927 | 0 | return Some(Err(Error::new( |
1928 | 0 | Code::Internal, |
1929 | 0 | format!("Extra keys from ft_aggregate: {other}"), |
1930 | 0 | ))); |
1931 | | } |
1932 | | } |
1933 | | } |
1934 | | } |
1935 | 15 | let Some(found_bytes_data) = bytes_data else { |
1936 | 0 | return Some(Err(Error::new( |
1937 | 0 | Code::Internal, |
1938 | 0 | format!("Missing '{DATA_FIELD_NAME}' in ft_aggregate, got: {raw_redis_map:?}"), |
1939 | 0 | ))); |
1940 | | }; |
1941 | | Some( |
1942 | 15 | K::decode(version.unwrap_or(0), found_bytes_data) |
1943 | 15 | .err_tip(|| "In RedisStore::search_by_index_prefix::decode"), |
1944 | | ) |
1945 | 32 | })) |
1946 | 28 | } |
1947 | | |
1948 | 94 | async fn get_and_decode<K>( |
1949 | 94 | &self, |
1950 | 94 | key: K, |
1951 | 94 | ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error> |
1952 | 94 | where |
1953 | 94 | K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send, |
1954 | 94 | { |
1955 | 94 | let key = key.get_key(); |
1956 | 94 | let key = self.encode_key(&key); |
1957 | 94 | let mut client = self.get_client().await?0 ; |
1958 | 94 | let results: Vec<Value> = client |
1959 | 94 | .connection_manager |
1960 | 94 | .hmget::<_, Vec<String>, Vec<Value>>( |
1961 | 94 | key.as_ref(), |
1962 | 94 | vec![VERSION_FIELD_NAME.into(), DATA_FIELD_NAME.into()], |
1963 | 94 | ) |
1964 | 94 | .await |
1965 | 94 | .err_tip(|| format!0 ("In RedisStore::get_without_version::notversioned {key}"))?0 ; |
1966 | 94 | let Some(Value::BulkString(data48 )) = results.get(1) else { |
1967 | 46 | return Ok(None); |
1968 | | }; |
1969 | | #[allow(clippy::get_first)] |
1970 | 48 | let version = if let Some(raw_v) = results.get(0) { |
1971 | 48 | match raw_v { |
1972 | 0 | Value::Int(v) => *v, |
1973 | 46 | Value::BulkString(v) => i64::from_str(str::from_utf8(v).expect("utf-8 bulkstring")) |
1974 | 46 | .expect("integer bulkstring"), |
1975 | 2 | Value::Nil => 0, |
1976 | | _ => { |
1977 | 0 | warn!(?raw_v, "Non-integer version!"); |
1978 | 0 | 0 |
1979 | | } |
1980 | | } |
1981 | | } else { |
1982 | 0 | 0 |
1983 | | }; |
1984 | | Ok(Some( |
1985 | 48 | K::decode(version, Bytes::from(data.clone())).err_tip(|| {0 |
1986 | 0 | format!("In RedisStore::get_with_version::notversioned::decode {key}") |
1987 | 0 | })?, |
1988 | | )) |
1989 | 94 | } |
1990 | | } |