/build/source/nativelink-store/src/redis_store.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::ops::{Bound, RangeBounds}; |
16 | | use core::pin::Pin; |
17 | | use core::time::Duration; |
18 | | use core::{cmp, iter}; |
19 | | use std::borrow::Cow; |
20 | | use std::sync::{Arc, Weak}; |
21 | | |
22 | | use async_trait::async_trait; |
23 | | use bytes::Bytes; |
24 | | use const_format::formatcp; |
25 | | use fred::clients::SubscriberClient; |
26 | | use fred::interfaces::{ClientLike, KeysInterface, PubsubInterface}; |
27 | | use fred::prelude::{Client, EventInterface, HashesInterface, RediSearchInterface}; |
28 | | use fred::types::config::{ |
29 | | Config as RedisConfig, ConnectionConfig, PerformanceConfig, ReconnectPolicy, UnresponsiveConfig, |
30 | | }; |
31 | | use fred::types::redisearch::{ |
32 | | AggregateOperation, FtAggregateOptions, FtCreateOptions, IndexKind, Load, SearchField, |
33 | | SearchSchema, SearchSchemaKind, WithCursor, |
34 | | }; |
35 | | use fred::types::scan::Scanner; |
36 | | use fred::types::scripts::Script; |
37 | | use fred::types::{Builder, Key as RedisKey, Map as RedisMap, SortOrder, Value as RedisValue}; |
38 | | use futures::stream::FuturesUnordered; |
39 | | use futures::{FutureExt, Stream, StreamExt, TryStreamExt, future}; |
40 | | use itertools::izip; |
41 | | use nativelink_config::stores::{RedisMode, RedisSpec}; |
42 | | use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err}; |
43 | | use nativelink_metric::MetricsComponent; |
44 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
45 | | use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; |
46 | | use nativelink_util::spawn; |
47 | | use nativelink_util::store_trait::{ |
48 | | BoolValue, RemoveItemCallback, SchedulerCurrentVersionProvider, SchedulerIndexProvider, |
49 | | SchedulerStore, SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider, |
50 | | SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo, |
51 | | }; |
52 | | use nativelink_util::task::JoinHandleDropGuard; |
53 | | use parking_lot::{Mutex, RwLock}; |
54 | | use patricia_tree::StringPatriciaMap; |
55 | | use tokio::select; |
56 | | use tokio::sync::{OwnedSemaphorePermit, Semaphore}; |
57 | | use tokio::time::sleep; |
58 | | use tracing::{error, info, trace, warn}; |
59 | | use uuid::Uuid; |
60 | | |
61 | | use crate::cas_utils::is_zero_digest; |
62 | | use crate::redis_utils::ft_aggregate; |
63 | | |
64 | | /// The default size of the read chunk when reading data from Redis. |
65 | | /// Note: If this changes it should be updated in the config documentation. |
66 | | const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024; |
67 | | |
68 | | /// The default size of the connection pool if not specified. |
69 | | /// Note: If this changes it should be updated in the config documentation. |
70 | | const DEFAULT_CONNECTION_POOL_SIZE: usize = 3; |
71 | | |
72 | | /// The default delay between retries if not specified. |
73 | | /// Note: If this changes it should be updated in the config documentation. |
74 | | const DEFAULT_RETRY_DELAY: f32 = 0.1; |
75 | | /// The amount of jitter to add to the retry delay if not specified. |
76 | | /// Note: If this changes it should be updated in the config documentation. |
77 | | const DEFAULT_RETRY_JITTER: f32 = 0.5; |
78 | | |
79 | | /// The default maximum capacity of the broadcast channel if not specified. |
80 | | /// Note: If this changes it should be updated in the config documentation. |
81 | | const DEFAULT_BROADCAST_CHANNEL_CAPACITY: usize = 4096; |
82 | | |
83 | | /// The default connection timeout in milliseconds if not specified. |
84 | | /// Note: If this changes it should be updated in the config documentation. |
85 | | const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000; |
86 | | |
87 | | /// The default command timeout in milliseconds if not specified. |
88 | | /// Note: If this changes it should be updated in the config documentation. |
89 | | const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000; |
90 | | |
91 | | /// The default maximum number of chunk uploads per update. |
92 | | /// Note: If this changes it should be updated in the config documentation. |
93 | | const DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE: usize = 10; |
94 | | |
95 | | /// The default COUNT value passed when scanning keys in Redis. |
96 | | /// Note: If this changes it should be updated in the config documentation. |
97 | | const DEFAULT_SCAN_COUNT: u32 = 10_000; |
98 | | |
99 | | const DEFAULT_CLIENT_PERMITS: usize = 500; |
100 | | |
101 | | #[derive(Clone, Debug)] |
102 | | pub struct RecoverablePool { |
103 | | clients: Arc<RwLock<Vec<Client>>>, |
104 | | builder: Builder, |
105 | | counter: Arc<core::sync::atomic::AtomicUsize>, |
106 | | } |
107 | | |
108 | | impl RecoverablePool { |
109 | 14 | pub fn new(builder: Builder, size: usize) -> Result<Self, Error> { |
110 | 14 | let mut clients = Vec::with_capacity(size); |
111 | 14 | for _ in 0..size { |
112 | 18 | let client = builder |
113 | 18 | .build() |
114 | 18 | .err_tip(|| "Failed to build client in RecoverablePool::new")?0 ; |
115 | 18 | clients.push(client); |
116 | | } |
117 | 14 | Ok(Self { |
118 | 14 | clients: Arc::new(RwLock::new(clients)), |
119 | 14 | builder, |
120 | 14 | counter: Arc::new(core::sync::atomic::AtomicUsize::new(0)), |
121 | 14 | }) |
122 | 14 | } |
123 | | |
124 | 14 | fn connect(&self) { |
125 | 14 | let clients = self.clients.read(); |
126 | 18 | for client in clients14 .iter14 () { |
127 | 18 | client.connect(); |
128 | 18 | } |
129 | 14 | } |
130 | | |
131 | 122 | fn next(&self) -> Client { |
132 | 122 | let clients = self.clients.read(); |
133 | 122 | let index = self |
134 | 122 | .counter |
135 | 122 | .fetch_add(1, core::sync::atomic::Ordering::Relaxed); |
136 | 122 | clients[index % clients.len()].clone() |
137 | 122 | } |
138 | | |
139 | 2 | async fn replace_client(&self, old_client: &Client) -> Result<Client, Error> { |
140 | | { |
141 | 2 | let clients = self.clients.read(); |
142 | 2 | if !clients.iter().any(|c| c.id() == old_client.id()) { Branch (142:16): [True: 0, False: 0]
Branch (142:16): [True: 0, False: 2]
Branch (142:16): [Folded - Ignored]
Branch (142:16): [True: 0, False: 0]
|
143 | | // Someone else swapped this client already; just hand out the next pooled one. |
144 | 0 | return Ok(self.next()); |
145 | 2 | } |
146 | | } |
147 | | |
148 | 2 | let new_client = self |
149 | 2 | .builder |
150 | 2 | .build() |
151 | 2 | .err_tip(|| "Failed to build new client in RecoverablePool::replace_client")?0 ; |
152 | 2 | new_client.connect(); |
153 | 2 | new_client.wait_for_connect().await.err_tip(|| { |
154 | 2 | format!( |
155 | 2 | "Failed to connect new client while replacing Redis client {}", |
156 | 2 | old_client.id() |
157 | | ) |
158 | 2 | })?; |
159 | | |
160 | 0 | let replaced_client = { |
161 | 0 | let mut clients = self.clients.write(); |
162 | 0 | clients |
163 | 0 | .iter() |
164 | 0 | .position(|c| c.id() == old_client.id()) |
165 | 0 | .map(|index| core::mem::replace(&mut clients[index], new_client.clone())) |
166 | | }; |
167 | | |
168 | 0 | if let Some(old_client) = replaced_client { Branch (168:16): [True: 0, False: 0]
Branch (168:16): [True: 0, False: 0]
Branch (168:16): [Folded - Ignored]
Branch (168:16): [True: 0, False: 0]
|
169 | 0 | let _unused = old_client.quit().await; |
170 | 0 | info!("Replaced Redis client {}", old_client.id()); |
171 | 0 | Ok(new_client) |
172 | | } else { |
173 | | // Second race: pool entry changed after we connected the new client. |
174 | 0 | let _unused = new_client.quit().await; |
175 | 0 | Ok(self.next()) |
176 | | } |
177 | 2 | } |
178 | | } |
179 | | |
180 | | /// A [`StoreDriver`] implementation that uses Redis as a backing store. |
181 | | #[derive(Debug, MetricsComponent)] |
182 | | pub struct RedisStore { |
183 | | /// The client pool connecting to the backing Redis instance(s). |
184 | | client_pool: RecoverablePool, |
185 | | |
186 | | /// A channel to publish updates to when a key is added, removed, or modified. |
187 | | #[metric( |
188 | | help = "The pubsub channel to publish updates to when a key is added, removed, or modified" |
189 | | )] |
190 | | pub_sub_channel: Option<String>, |
191 | | |
192 | | /// A redis client for managing subscriptions. |
193 | | /// TODO: This should be moved into the store in followups once a standard use pattern has been determined. |
194 | | subscriber_client: SubscriberClient, |
195 | | |
196 | | /// A function used to generate names for temporary keys. |
197 | | temp_name_generator_fn: fn() -> String, |
198 | | |
199 | | /// A common prefix to append to all keys before they are sent to Redis. |
200 | | /// |
201 | | /// See [`RedisStore::key_prefix`](`nativelink_config::stores::RedisStore::key_prefix`). |
202 | | #[metric(help = "Prefix to append to all keys before sending to Redis")] |
203 | | key_prefix: String, |
204 | | |
205 | | /// The amount of data to read from Redis at a time. |
206 | | #[metric(help = "The amount of data to read from Redis at a time")] |
207 | | read_chunk_size: usize, |
208 | | |
209 | | /// The maximum number of chunk uploads per update. |
210 | | /// This is used to limit the number of chunk uploads per update to prevent |
211 | | #[metric(help = "The maximum number of chunk uploads per update")] |
212 | | max_chunk_uploads_per_update: usize, |
213 | | |
214 | | /// The COUNT value passed when scanning keys in Redis. |
215 | | /// This is used to hint the amount of work that should be done per response. |
216 | | #[metric(help = "The COUNT value passed when scanning keys in Redis")] |
217 | | scan_count: u32, |
218 | | |
219 | | /// Redis script used to update a value in redis if the version matches. |
220 | | /// This is done by incrementing the version number and then setting the new data |
221 | | /// only if the version number matches the existing version number. |
222 | | update_if_version_matches_script: Script, |
223 | | |
224 | | /// A manager for subscriptions to keys in Redis. |
225 | | subscription_manager: Mutex<Option<Arc<RedisSubscriptionManager>>>, |
226 | | |
227 | | /// Permits to limit inflight Redis requests. Technically only |
228 | | /// limits the calls to `get_client()`, but the requests per client |
229 | | /// are small enough that it works well enough. |
230 | | client_permits: Arc<Semaphore>, |
231 | | } |
232 | | |
233 | | struct ClientWithPermit { |
234 | | client: Client, |
235 | | |
236 | | // here so it sticks around with the client and doesn't get dropped until that does |
237 | | #[allow(dead_code)] |
238 | | semaphore_permit: OwnedSemaphorePermit, |
239 | | } |
240 | | |
241 | | impl Drop for ClientWithPermit { |
242 | 120 | fn drop(&mut self) { |
243 | 120 | trace!( |
244 | 120 | remaining = self.semaphore_permit.semaphore().available_permits(), |
245 | 120 | "Dropping a client permit" |
246 | | ); |
247 | 120 | } |
248 | | } |
249 | | |
250 | | impl RedisStore { |
251 | | /// Create a new `RedisStore` from the given configuration. |
252 | 2 | pub fn new(mut spec: RedisSpec) -> Result<Arc<Self>, Error> { |
253 | 2 | if spec.addresses.is_empty() { Branch (253:12): [True: 0, False: 2]
Branch (253:12): [Folded - Ignored]
|
254 | 0 | return Err(make_err!( |
255 | 0 | Code::InvalidArgument, |
256 | 0 | "No addresses were specified in redis store configuration." |
257 | 0 | )); |
258 | 2 | } |
259 | 2 | let [addr] = spec.addresses.as_slice() else { Branch (259:13): [True: 2, False: 0]
Branch (259:13): [Folded - Ignored]
|
260 | 0 | return Err(make_err!( |
261 | 0 | Code::Unimplemented, |
262 | 0 | "Connecting directly to multiple redis nodes in a cluster is currently unsupported. Please specify a single URL to a single node, and nativelink will use cluster discover to find the other nodes." |
263 | 0 | )); |
264 | | }; |
265 | 2 | let redis_config = match spec.mode { |
266 | 0 | RedisMode::Cluster => RedisConfig::from_url_clustered(addr), |
267 | 0 | RedisMode::Sentinel => RedisConfig::from_url_sentinel(addr), |
268 | 2 | RedisMode::Standard => RedisConfig::from_url_centralized(addr), |
269 | | } |
270 | 2 | .err_tip_with_code(|e| {0 |
271 | 0 | ( |
272 | 0 | Code::InvalidArgument, |
273 | 0 | format!("while parsing redis node address: {e}"), |
274 | 0 | ) |
275 | 0 | })?; |
276 | | |
277 | 2 | let reconnect_policy = { |
278 | 2 | if spec.retry.delay == 0.0 { Branch (278:16): [True: 2, False: 0]
Branch (278:16): [Folded - Ignored]
|
279 | 2 | spec.retry.delay = DEFAULT_RETRY_DELAY; |
280 | 2 | }0 |
281 | 2 | if spec.retry.jitter == 0.0 { Branch (281:16): [True: 2, False: 0]
Branch (281:16): [Folded - Ignored]
|
282 | 2 | spec.retry.jitter = DEFAULT_RETRY_JITTER; |
283 | 2 | }0 |
284 | | |
285 | 4 | let to_ms2 = |secs: f32| -> u32 { |
286 | 4 | Duration::from_secs_f32(secs) |
287 | 4 | .as_millis() |
288 | 4 | .try_into() |
289 | 4 | .unwrap_or(u32::MAX) |
290 | 4 | }; |
291 | | |
292 | 2 | let max_retries = u32::try_from(spec.retry.max_retries) |
293 | 2 | .err_tip(|| "max_retries could not be converted to u32 in RedisStore::new")?0 ; |
294 | | |
295 | 2 | let min_delay_ms = to_ms(spec.retry.delay); |
296 | 2 | let max_delay_ms = 8000; |
297 | 2 | let jitter = to_ms(spec.retry.jitter * spec.retry.delay); |
298 | | |
299 | 2 | let mut reconnect_policy = |
300 | 2 | ReconnectPolicy::new_exponential(max_retries, min_delay_ms, max_delay_ms, 2); |
301 | 2 | reconnect_policy.set_jitter(jitter); |
302 | 2 | reconnect_policy |
303 | | }; |
304 | | |
305 | | { |
306 | 2 | if spec.broadcast_channel_capacity == 0 { Branch (306:16): [True: 2, False: 0]
Branch (306:16): [Folded - Ignored]
|
307 | 2 | spec.broadcast_channel_capacity = DEFAULT_BROADCAST_CHANNEL_CAPACITY; |
308 | 2 | }0 |
309 | 2 | if spec.connection_timeout_ms == 0 { Branch (309:16): [True: 2, False: 0]
Branch (309:16): [Folded - Ignored]
|
310 | 2 | spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS; |
311 | 2 | }0 |
312 | 2 | if spec.command_timeout_ms == 0 { Branch (312:16): [True: 2, False: 0]
Branch (312:16): [Folded - Ignored]
|
313 | 2 | spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS; |
314 | 2 | }0 |
315 | 2 | if spec.connection_pool_size == 0 { Branch (315:16): [True: 2, False: 0]
Branch (315:16): [Folded - Ignored]
|
316 | 2 | spec.connection_pool_size = DEFAULT_CONNECTION_POOL_SIZE; |
317 | 2 | }0 |
318 | 2 | if spec.read_chunk_size == 0 { Branch (318:16): [True: 2, False: 0]
Branch (318:16): [Folded - Ignored]
|
319 | 2 | spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE; |
320 | 2 | }0 |
321 | 2 | if spec.max_chunk_uploads_per_update == 0 { Branch (321:16): [True: 2, False: 0]
Branch (321:16): [Folded - Ignored]
|
322 | 2 | spec.max_chunk_uploads_per_update = DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE; |
323 | 2 | }0 |
324 | 2 | if spec.scan_count == 0 { Branch (324:16): [True: 2, False: 0]
Branch (324:16): [Folded - Ignored]
|
325 | 2 | spec.scan_count = DEFAULT_SCAN_COUNT; |
326 | 2 | }0 |
327 | 2 | if spec.max_client_permits == 0 { Branch (327:16): [True: 2, False: 0]
Branch (327:16): [Folded - Ignored]
|
328 | 2 | spec.max_client_permits = DEFAULT_CLIENT_PERMITS; |
329 | 2 | }0 |
330 | | } |
331 | 2 | let connection_timeout = Duration::from_millis(spec.connection_timeout_ms); |
332 | 2 | let command_timeout = Duration::from_millis(spec.command_timeout_ms); |
333 | | |
334 | 2 | let mut builder = Builder::from_config(redis_config); |
335 | 2 | builder |
336 | 2 | .set_performance_config(PerformanceConfig { |
337 | 2 | default_command_timeout: command_timeout, |
338 | 2 | broadcast_channel_capacity: spec.broadcast_channel_capacity, |
339 | 2 | ..Default::default() |
340 | 2 | }) |
341 | 2 | .set_connection_config(ConnectionConfig { |
342 | 2 | connection_timeout, |
343 | 2 | internal_command_timeout: command_timeout, |
344 | 2 | unresponsive: UnresponsiveConfig { |
345 | 2 | max_timeout: Some(connection_timeout), |
346 | 2 | // This number needs to be less than the connection timeout. |
347 | 2 | // We use 4 as it is a good balance between not spamming the server |
348 | 2 | // and not waiting too long. |
349 | 2 | interval: connection_timeout / 4, |
350 | 2 | }, |
351 | 2 | ..Default::default() |
352 | 2 | }) |
353 | 2 | .set_policy(reconnect_policy); |
354 | | |
355 | 2 | let client_pool = RecoverablePool::new(builder.clone(), spec.connection_pool_size) |
356 | 2 | .err_tip(|| "while creating redis connection pool")?0 ; |
357 | | |
358 | 2 | let subscriber_client = builder |
359 | 2 | .build_subscriber_client() |
360 | 2 | .err_tip(|| "while creating redis subscriber client")?0 ; |
361 | | |
362 | 2 | Self::new_from_builder_and_parts( |
363 | 2 | client_pool, |
364 | 2 | subscriber_client, |
365 | 2 | spec.experimental_pub_sub_channel.clone(), |
366 | 1 | || Uuid::new_v4().to_string(), |
367 | 2 | spec.key_prefix.clone(), |
368 | 2 | spec.read_chunk_size, |
369 | 2 | spec.max_chunk_uploads_per_update, |
370 | 2 | spec.scan_count, |
371 | 2 | spec.max_client_permits, |
372 | | ) |
373 | 2 | .map(Arc::new) |
374 | 2 | } |
375 | | |
376 | | /// Used for testing when determinism is required. |
377 | | #[expect(clippy::too_many_arguments)] |
378 | 14 | pub fn new_from_builder_and_parts( |
379 | 14 | client_pool: RecoverablePool, |
380 | 14 | subscriber_client: SubscriberClient, |
381 | 14 | pub_sub_channel: Option<String>, |
382 | 14 | temp_name_generator_fn: fn() -> String, |
383 | 14 | key_prefix: String, |
384 | 14 | read_chunk_size: usize, |
385 | 14 | max_chunk_uploads_per_update: usize, |
386 | 14 | scan_count: u32, |
387 | 14 | max_client_permits: usize, |
388 | 14 | ) -> Result<Self, Error> { |
389 | | // Start connection pool (this will retry forever by default). |
390 | 14 | client_pool.connect(); |
391 | 14 | subscriber_client.connect(); |
392 | | |
393 | 14 | info!("Redis index fingerprint: {FINGERPRINT_CREATE_INDEX_HEX}"); |
394 | | |
395 | 14 | Ok(Self { |
396 | 14 | client_pool, |
397 | 14 | pub_sub_channel, |
398 | 14 | subscriber_client, |
399 | 14 | temp_name_generator_fn, |
400 | 14 | key_prefix, |
401 | 14 | read_chunk_size, |
402 | 14 | max_chunk_uploads_per_update, |
403 | 14 | scan_count, |
404 | 14 | update_if_version_matches_script: Script::from_lua(LUA_VERSION_SET_SCRIPT), |
405 | 14 | subscription_manager: Mutex::new(None), |
406 | 14 | client_permits: Arc::new(Semaphore::new(max_client_permits)), |
407 | 14 | }) |
408 | 14 | } |
409 | | |
410 | 122 | async fn get_client(&self) -> Result<ClientWithPermit, Error> { |
411 | 122 | let mut client = self.client_pool.next(); |
412 | | loop { |
413 | 122 | let config = client.client_config(); |
414 | 122 | if config.mocks.is_some() { Branch (414:16): [True: 0, False: 0]
Branch (414:16): [True: 24, False: 2]
Branch (414:16): [Folded - Ignored]
Branch (414:16): [True: 96, False: 0]
|
415 | 120 | break; |
416 | 2 | } |
417 | 2 | let connection_info = format!( |
418 | 2 | "Connection issue connecting to redis server with hosts: {:?}, username: {}, database: {}", |
419 | 2 | config |
420 | 2 | .server |
421 | 2 | .hosts() |
422 | 2 | .iter() |
423 | 2 | .map(|s| format!("{}:{}", s.host, s.port)) |
424 | 2 | .collect::<Vec<String>>(), |
425 | 2 | config |
426 | 2 | .username |
427 | 2 | .clone() |
428 | 2 | .unwrap_or_else(|| "None".to_string()), |
429 | 2 | config.database.unwrap_or_default() |
430 | | ); |
431 | 2 | match client.wait_for_connect().await { |
432 | 0 | Ok(()) => break, |
433 | 2 | Err(e) => { |
434 | 2 | warn!("{connection_info}: {e:?}. Replacing client."); |
435 | 2 | client0 = self |
436 | 2 | .client_pool |
437 | 2 | .replace_client(&client) |
438 | 2 | .await |
439 | 2 | .err_tip(|| connection_info.clone())?; |
440 | | } |
441 | | } |
442 | | } |
443 | 120 | let local_client_permits = self.client_permits.clone(); |
444 | 120 | let remaining = local_client_permits.available_permits(); |
445 | 120 | let semaphore_permit = local_client_permits.acquire_owned().await?0 ; |
446 | 120 | trace!(remaining, "Got a client permit"); |
447 | 120 | Ok(ClientWithPermit { |
448 | 120 | client, |
449 | 120 | semaphore_permit, |
450 | 120 | }) |
451 | 122 | } |
452 | | |
453 | | /// Encode a [`StoreKey`] so it can be sent to Redis. |
454 | 93 | fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> { |
455 | 93 | let key_body = key.as_str(); |
456 | 93 | if self.key_prefix.is_empty() { Branch (456:12): [True: 89, False: 4]
Branch (456:12): [Folded - Ignored]
|
457 | 89 | key_body |
458 | | } else { |
459 | | // This is in the hot path for all redis operations, so we try to reuse the allocation |
460 | | // from `key.as_str()` if possible. |
461 | 4 | match key_body { |
462 | 4 | Cow::Owned(mut encoded_key) => { |
463 | 4 | encoded_key.insert_str(0, &self.key_prefix); |
464 | 4 | Cow::Owned(encoded_key) |
465 | | } |
466 | 0 | Cow::Borrowed(body) => { |
467 | 0 | let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len()); |
468 | 0 | encoded_key.push_str(&self.key_prefix); |
469 | 0 | encoded_key.push_str(body); |
470 | 0 | Cow::Owned(encoded_key) |
471 | | } |
472 | | } |
473 | | } |
474 | 93 | } |
475 | | } |
476 | | |
477 | | #[async_trait] |
478 | | impl StoreDriver for RedisStore { |
479 | | async fn has_with_results( |
480 | | self: Pin<&Self>, |
481 | | keys: &[StoreKey<'_>], |
482 | | results: &mut [Option<u64>], |
483 | 7 | ) -> Result<(), Error> { |
484 | | // TODO(palfrey) We could use pipeline here, but it makes retry more |
485 | | // difficult and it doesn't work very well in cluster mode. |
486 | | // If we wanted to optimize this with pipeline be careful to |
487 | | // implement retry and to support cluster mode. |
488 | | |
489 | | let client = self.get_client().await?; |
490 | | |
491 | | // If we ask for many keys in one go, this can timeout, so limit that |
492 | | let max_in_one_go = Arc::new(Semaphore::const_new(5)); |
493 | | |
494 | | izip!( |
495 | | keys.iter(), |
496 | | results.iter_mut(), |
497 | | iter::repeat(&max_in_one_go), |
498 | | iter::repeat(&client) |
499 | | ) |
500 | 6 | .map(|(key, result, local_semaphore, client)| async move { |
501 | | // We need to do a special pass to ensure our zero key exist. |
502 | 6 | if is_zero_digest(key.borrow()) { Branch (502:16): [True: 2, False: 4]
Branch (502:16): [Folded - Ignored]
|
503 | 2 | *result = Some(0); |
504 | 2 | return Ok::<_, Error>(()); |
505 | 4 | } |
506 | 4 | let encoded_key = self.encode_key(key); |
507 | | |
508 | 4 | let guard = local_semaphore.acquire().await?0 ; |
509 | | |
510 | 4 | let pipeline = client.client.pipeline(); |
511 | 4 | pipeline |
512 | 4 | .strlen::<(), _>(encoded_key.as_ref()) |
513 | 4 | .await |
514 | 4 | .err_tip(|| format!("In RedisStore::has_with_results::strlen for {encoded_key}"0 ))?0 ; |
515 | | // Redis returns 0 when the key doesn't exist |
516 | | // AND when the key exists with value of length 0. |
517 | | // Therefore, we need to check both length and existence |
518 | | // and do it in a pipeline for efficiency. |
519 | 4 | pipeline |
520 | 4 | .exists::<(), _>(encoded_key.as_ref()) |
521 | 4 | .await |
522 | 4 | .err_tip(|| format!("In RedisStore::has_with_results::exists for {encoded_key}"0 ))?0 ; |
523 | 4 | let (blob_len, exists) = pipeline |
524 | 4 | .all::<(u64, bool)>() |
525 | 4 | .await |
526 | 4 | .err_tip(|| "In RedisStore::has_with_results::all")?0 ; |
527 | | |
528 | 4 | *result = if exists { Some(blob_len) } else { None0 }; Branch (528:26): [True: 4, False: 0]
Branch (528:26): [Folded - Ignored]
|
529 | | |
530 | 4 | drop(guard); |
531 | | |
532 | 4 | Ok::<_, Error>(()) |
533 | 12 | }) |
534 | | .collect::<FuturesUnordered<_>>() |
535 | | .try_collect() |
536 | | .await |
537 | 7 | } |
538 | | |
539 | | async fn list( |
540 | | self: Pin<&Self>, |
541 | | range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>), |
542 | | handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_), |
543 | 8 | ) -> Result<u64, Error> { |
544 | | let range = ( |
545 | | range.0.map(StoreKey::into_owned), |
546 | | range.1.map(StoreKey::into_owned), |
547 | | ); |
548 | | let pattern = match range.0 { |
549 | | Bound::Included(ref start) | Bound::Excluded(ref start) => match range.1 { |
550 | | Bound::Included(ref end) | Bound::Excluded(ref end) => { |
551 | | let start = start.as_str(); |
552 | | let end = end.as_str(); |
553 | | let max_length = start.len().min(end.len()); |
554 | | let length = start |
555 | | .chars() |
556 | | .zip(end.chars()) |
557 | 20 | .position(|(a, b)| a != b) |
558 | | .unwrap_or(max_length); |
559 | | format!("{}{}*", self.key_prefix, &start[..length]) |
560 | | } |
561 | | Bound::Unbounded => format!("{}*", self.key_prefix), |
562 | | }, |
563 | | Bound::Unbounded => format!("{}*", self.key_prefix), |
564 | | }; |
565 | | let client = self.get_client().await?; |
566 | | let mut scan_stream = client.client.scan(pattern, Some(self.scan_count), None); |
567 | | let mut iterations = 0; |
568 | | 'outer: while let Some(mut page) = scan_stream.try_next().await? { |
569 | | if let Some(keys) = page.take_results() { |
570 | | for key in keys { |
571 | | // TODO: Notification of conversion errors |
572 | | // Any results that do not conform to expectations are ignored. |
573 | | if let Some(key) = key.as_str() { |
574 | | if let Some(key) = key.strip_prefix(&self.key_prefix) { |
575 | | let key = StoreKey::new_str(key); |
576 | | if range.contains(&key) { |
577 | | iterations += 1; |
578 | | if !handler(&key) { |
579 | | break 'outer; |
580 | | } |
581 | | } |
582 | | } |
583 | | } |
584 | | } |
585 | | } |
586 | | page.next(); |
587 | | } |
588 | | Ok(iterations) |
589 | 8 | } |
590 | | |
591 | | async fn update( |
592 | | self: Pin<&Self>, |
593 | | key: StoreKey<'_>, |
594 | | mut reader: DropCloserReadHalf, |
595 | | _upload_size: UploadSizeInfo, |
596 | 8 | ) -> Result<(), Error> { |
597 | | let final_key = self.encode_key(&key); |
598 | | |
599 | | // While the name generation function can be supplied by the user, we need to have the curly |
600 | | // braces in place in order to manage redis' hashing behavior and make sure that the temporary |
601 | | // key name and the final key name are directed to the same cluster node. See |
602 | | // https://redis.io/blog/redis-clustering-best-practices-with-keys/ |
603 | | // |
604 | | // The TL;DR is that if we're in cluster mode and the names hash differently, we can't use request |
605 | | // pipelining. By using these braces, we tell redis to only hash the part of the temporary key that's |
606 | | // identical to the final key -- so they will always hash to the same node. |
607 | | let temp_key = format!( |
608 | | "temp-{}-{{{}}}", |
609 | | (self.temp_name_generator_fn)(), |
610 | | &final_key |
611 | | ); |
612 | | |
613 | | if is_zero_digest(key.borrow()) { |
614 | | let chunk = reader |
615 | | .peek() |
616 | | .await |
617 | | .err_tip(|| "Failed to peek in RedisStore::update")?; |
618 | | if chunk.is_empty() { |
619 | | reader |
620 | | .drain() |
621 | | .await |
622 | | .err_tip(|| "Failed to drain in RedisStore::update")?; |
623 | | // Zero-digest keys are special -- we don't need to do anything with it. |
624 | | return Ok(()); |
625 | | } |
626 | | } |
627 | | |
628 | | let client = self.get_client().await?; |
629 | | |
630 | | let mut read_stream = reader |
631 | 6 | .scan(0u32, |bytes_read, chunk_res| { |
632 | 6 | future::ready(Some( |
633 | 6 | chunk_res |
634 | 6 | .err_tip(|| "Failed to read chunk in update in redis store") |
635 | 6 | .and_then(|chunk| {5 |
636 | 5 | let offset = *bytes_read; |
637 | 5 | let chunk_len = u32::try_from(chunk.len()).err_tip( |
638 | | || "Could not convert chunk length to u32 in RedisStore::update", |
639 | 0 | )?; |
640 | 5 | let new_bytes_read = bytes_read |
641 | 5 | .checked_add(chunk_len) |
642 | 5 | .err_tip(|| "Overflow protection in RedisStore::update")?0 ; |
643 | 5 | *bytes_read = new_bytes_read; |
644 | 5 | Ok::<_, Error>((offset, *bytes_read, chunk)) |
645 | 5 | }), |
646 | | )) |
647 | 6 | }) |
648 | 6 | .map(|res| { |
649 | 6 | let (offset5 , end_pos5 , chunk5 ) = res?1 ; |
650 | 5 | let temp_key_ref = &temp_key; |
651 | 5 | let client = client.client.clone(); |
652 | 5 | Ok(async move { |
653 | 5 | client |
654 | 5 | .setrange::<(), _, _>(temp_key_ref, offset, chunk) |
655 | 5 | .await |
656 | 5 | .err_tip( |
657 | 0 | || format!("While appending to temp key ({temp_key_ref}) in RedisStore::update. offset = {offset}. end_pos = {end_pos}"), |
658 | 0 | )?; |
659 | 5 | Ok::<u32, Error>(end_pos) |
660 | 5 | }) |
661 | 6 | }) |
662 | | .try_buffer_unordered(self.max_chunk_uploads_per_update); |
663 | | |
664 | | let mut total_len: u32 = 0; |
665 | | while let Some(last_pos) = read_stream.try_next().await? { |
666 | | if last_pos > total_len { |
667 | | total_len = last_pos; |
668 | | } |
669 | | } |
670 | | |
671 | | let blob_len = client |
672 | | .client |
673 | | .strlen::<u64, _>(&temp_key) |
674 | | .await |
675 | 0 | .err_tip(|| format!("In RedisStore::update strlen check for {temp_key}"))?; |
676 | | // This is a safety check to ensure that in the event some kind of retry was to happen |
677 | | // and the data was appended to the key twice, we reject the data. |
678 | | if blob_len != u64::from(total_len) { |
679 | | return Err(make_input_err!( |
680 | | "Data length mismatch in RedisStore::update for {}({}) - expected {} bytes, got {} bytes", |
681 | | key.borrow().as_str(), |
682 | | temp_key, |
683 | | total_len, |
684 | | blob_len, |
685 | | )); |
686 | | } |
687 | | |
688 | | // Rename the temp key so that the data appears under the real key. Any data already present in the real key is lost. |
689 | | client |
690 | | .client |
691 | | .rename::<(), _, _>(&temp_key, final_key.as_ref()) |
692 | | .await |
693 | | .err_tip(|| "While queueing key rename in RedisStore::update()")?; |
694 | | |
695 | | // If we have a publish channel configured, send a notice that the key has been set. |
696 | | if let Some(pub_sub_channel) = &self.pub_sub_channel { |
697 | | return Ok(client |
698 | | .client |
699 | | .publish(pub_sub_channel, final_key.as_ref()) |
700 | | .await?); |
701 | | } |
702 | | |
703 | | Ok(()) |
704 | 8 | } |
705 | | |
706 | | async fn get_part( |
707 | | self: Pin<&Self>, |
708 | | key: StoreKey<'_>, |
709 | | writer: &mut DropCloserWriteHalf, |
710 | | offset: u64, |
711 | | length: Option<u64>, |
712 | 5 | ) -> Result<(), Error> { |
713 | | let offset = usize::try_from(offset).err_tip(|| "Could not convert offset to usize")?; |
714 | | let length = length |
715 | 4 | .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize")) |
716 | | .transpose()?; |
717 | | |
718 | | // To follow RBE spec we need to consider any digest's with |
719 | | // zero size to be existing. |
720 | | if is_zero_digest(key.borrow()) { |
721 | | return writer |
722 | | .send_eof() |
723 | | .err_tip(|| "Failed to send zero EOF in redis store get_part"); |
724 | | } |
725 | | |
726 | | let encoded_key = self.encode_key(&key); |
727 | | let encoded_key = encoded_key.as_ref(); |
728 | | |
729 | | // N.B. the `-1`'s you see here are because redis GETRANGE is inclusive at both the start and end, so when we |
730 | | // do math with indices we change them to be exclusive at the end. |
731 | | |
732 | | // We want to read the data at the key from `offset` to `offset + length`. |
733 | | let data_start = offset; |
734 | | let data_end = data_start |
735 | | .saturating_add(length.unwrap_or(isize::MAX as usize)) |
736 | | .saturating_sub(1); |
737 | | |
738 | | // And we don't ever want to read more than `read_chunk_size` bytes at a time, so we'll need to iterate. |
739 | | let mut chunk_start = data_start; |
740 | | let mut chunk_end = cmp::min( |
741 | | data_start.saturating_add(self.read_chunk_size) - 1, |
742 | | data_end, |
743 | | ); |
744 | | |
745 | | let client = self.get_client().await?; |
746 | | loop { |
747 | | let chunk: Bytes = client |
748 | | .client |
749 | | .getrange(encoded_key, chunk_start, chunk_end) |
750 | | .await |
751 | | .err_tip(|| "In RedisStore::get_part::getrange")?; |
752 | | |
753 | | let didnt_receive_full_chunk = chunk.len() < self.read_chunk_size; |
754 | | let reached_end_of_data = chunk_end == data_end; |
755 | | |
756 | | if didnt_receive_full_chunk || reached_end_of_data { |
757 | | if !chunk.is_empty() { |
758 | | writer |
759 | | .send(chunk) |
760 | | .await |
761 | | .err_tip(|| "Failed to write data in RedisStore::get_part")?; |
762 | | } |
763 | | |
764 | | break; // No more data to read. |
765 | | } |
766 | | |
767 | | // We received a full chunk's worth of data, so write it... |
768 | | writer |
769 | | .send(chunk) |
770 | | .await |
771 | | .err_tip(|| "Failed to write data in RedisStore::get_part")?; |
772 | | |
773 | | // ...and go grab the next chunk. |
774 | | chunk_start = chunk_end + 1; |
775 | | chunk_end = cmp::min( |
776 | | chunk_start.saturating_add(self.read_chunk_size) - 1, |
777 | | data_end, |
778 | | ); |
779 | | } |
780 | | |
781 | | // If we didn't write any data, check if the key exists, if not return a NotFound error. |
782 | | // This is required by spec. |
783 | | if writer.get_bytes_written() == 0 { |
784 | | // We're supposed to read 0 bytes, so just check if the key exists. |
785 | | let exists = client |
786 | | .client |
787 | | .exists::<bool, _>(encoded_key) |
788 | | .await |
789 | | .err_tip(|| "In RedisStore::get_part::zero_exists")?; |
790 | | |
791 | | if !exists { |
792 | | return Err(make_err!( |
793 | | Code::NotFound, |
794 | | "Data not found in Redis store for digest: {key:?}" |
795 | | )); |
796 | | } |
797 | | } |
798 | | |
799 | | writer |
800 | | .send_eof() |
801 | | .err_tip(|| "Failed to write EOF in redis store get_part") |
802 | 5 | } |
803 | | |
804 | 0 | fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { |
805 | 0 | self |
806 | 0 | } |
807 | | |
808 | 0 | fn as_any(&self) -> &(dyn core::any::Any + Sync + Send) { |
809 | 0 | self |
810 | 0 | } |
811 | | |
812 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send> { |
813 | 0 | self |
814 | 0 | } |
815 | | |
816 | 0 | fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) { |
817 | 0 | registry.register_indicator(self); |
818 | 0 | } |
819 | | |
820 | 0 | fn register_remove_callback( |
821 | 0 | self: Arc<Self>, |
822 | 0 | _callback: Arc<dyn RemoveItemCallback>, |
823 | 0 | ) -> Result<(), Error> { |
824 | | // As redis doesn't drop stuff, we can just ignore this |
825 | 0 | Ok(()) |
826 | 0 | } |
827 | | } |
828 | | |
829 | | #[async_trait] |
830 | | impl HealthStatusIndicator for RedisStore { |
831 | 1 | fn get_name(&self) -> &'static str { |
832 | 1 | "RedisStore" |
833 | 1 | } |
834 | | |
835 | 0 | async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { |
836 | | StoreDriver::check_health(Pin::new(self), namespace).await |
837 | 0 | } |
838 | | } |
839 | | |
840 | | // ------------------------------------------------------------------- |
841 | | // Below this line are specific to the redis scheduler implementation. |
842 | | // ------------------------------------------------------------------- |
843 | | |
844 | | /// The maximum number of results to return per cursor. |
845 | | const MAX_COUNT_PER_CURSOR: u64 = 256; |
846 | | /// The time in milliseconds that a redis cursor can be idle before it is closed. |
847 | | const CURSOR_IDLE_MS: u64 = 2_000; |
848 | | /// The name of the field in the Redis hash that stores the data. |
849 | | const DATA_FIELD_NAME: &str = "data"; |
850 | | /// The name of the field in the Redis hash that stores the version. |
851 | | const VERSION_FIELD_NAME: &str = "version"; |
852 | | /// The time to live of indexes in seconds. After this time redis may delete the index. |
853 | | const INDEX_TTL_S: u64 = 60 * 60 * 24; // 24 hours. |
854 | | |
855 | | /// Lua script to set a key if the version matches. |
856 | | /// Args: |
857 | | /// KEYS[1]: The key where the version is stored. |
858 | | /// ARGV[1]: The expected version. |
859 | | /// ARGV[2]: The new data. |
860 | | /// ARGV[3*]: Key-value pairs of additional data to include. |
861 | | /// Returns: |
862 | | /// The new version if the version matches. nil is returned if the |
863 | | /// value was not set. |
864 | | const LUA_VERSION_SET_SCRIPT: &str = formatcp!( |
865 | | r" |
866 | | local key = KEYS[1] |
867 | | local expected_version = tonumber(ARGV[1]) |
868 | | local new_data = ARGV[2] |
869 | | local new_version = redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', 1) |
870 | | local i |
871 | | local indexes = {{}} |
872 | | |
873 | | if new_version-1 ~= expected_version then |
874 | | redis.call('HINCRBY', key, '{VERSION_FIELD_NAME}', -1) |
875 | | return {{ 0, new_version-1 }} |
876 | | end |
877 | | -- Skip first 2 argvs, as they are known inputs. |
878 | | -- Remember: Lua is 1-indexed. |
879 | | for i=3, #ARGV do |
880 | | indexes[i-2] = ARGV[i] |
881 | | end |
882 | | |
883 | | -- In testing we witnessed redis sometimes not update our FT indexes |
884 | | -- resulting in stale data. It appears if we delete our keys then insert |
885 | | -- them again it works and reduces risk significantly. |
886 | | redis.call('DEL', key) |
887 | | redis.call('HSET', key, '{DATA_FIELD_NAME}', new_data, '{VERSION_FIELD_NAME}', new_version, unpack(indexes)) |
888 | | |
889 | | return {{ 1, new_version }} |
890 | | " |
891 | | ); |
892 | | |
893 | | /// This is the output of the calculations below hardcoded into the executable. |
894 | | const FINGERPRINT_CREATE_INDEX_HEX: &str = "3e762c15"; |
895 | | |
896 | | #[cfg(test)] |
897 | | mod test { |
898 | | use super::FINGERPRINT_CREATE_INDEX_HEX; |
899 | | |
900 | | /// String of the `FT.CREATE` command used to create the index template. |
901 | | const CREATE_INDEX_TEMPLATE: &str = "FT.CREATE {} ON HASH PREFIX 1 {} NOOFFSETS NOHL NOFIELDS NOFREQS SCHEMA {} TAG CASESENSITIVE SORTABLE"; |
902 | | |
903 | | /// Compile-time fingerprint of the `FT.CREATE` command used to create the |
904 | | /// index template. This is a simple CRC32 checksum of the command string. |
905 | | /// We don't care about it actually being a valid CRC32 checksum, just that |
906 | | /// it's a unique identifier with a low chance of collision. |
907 | 1 | const fn fingerprint_create_index_template() -> u32 { |
908 | | const POLY: u32 = 0xEDB8_8320; |
909 | | const DATA: &[u8] = CREATE_INDEX_TEMPLATE.as_bytes(); |
910 | 1 | let mut crc = 0xFFFF_FFFF; |
911 | 1 | let mut i = 0; |
912 | 102 | while i < DATA.len() { Branch (912:15): [True: 101, False: 1]
|
913 | 101 | let byte = DATA[i]; |
914 | 101 | crc ^= byte as u32; |
915 | | |
916 | 101 | let mut j = 0; |
917 | 909 | while j < 8 { Branch (917:19): [True: 808, False: 101]
|
918 | 808 | crc = if crc & 1 != 0 { Branch (918:26): [True: 386, False: 422]
|
919 | 386 | (crc >> 1) ^ POLY |
920 | | } else { |
921 | 422 | crc >> 1 |
922 | | }; |
923 | 808 | j += 1; |
924 | | } |
925 | 101 | i += 1; |
926 | | } |
927 | 1 | crc |
928 | 1 | } |
929 | | |
930 | | /// Verify that our calculation always evaluates to this fixed value. |
931 | | #[test] |
932 | 1 | fn test_fingerprint_value() { |
933 | 1 | assert_eq!( |
934 | 1 | format!("{:08x}", &fingerprint_create_index_template()), |
935 | | FINGERPRINT_CREATE_INDEX_HEX, |
936 | | ); |
937 | 1 | } |
938 | | } |
939 | | |
940 | | /// Get the name of the index to create for the given field. |
941 | | /// This will add some prefix data to the name to try and ensure |
942 | | /// if the index definition changes, the name will get a new name. |
943 | | macro_rules! get_index_name { |
944 | | ($prefix:expr, $field:expr, $maybe_sort:expr) => { |
945 | | format_args!( |
946 | | "{}_{}_{}_{}", |
947 | | $prefix, |
948 | | $field, |
949 | | $maybe_sort.unwrap_or(""), |
950 | | FINGERPRINT_CREATE_INDEX_HEX |
951 | | ) |
952 | | }; |
953 | | } |
954 | | |
955 | | /// Try to sanitize a string to be used as a Redis key. |
956 | | /// We don't actually modify the string, just check if it's valid. |
957 | 20 | const fn try_sanitize(s: &str) -> Option<&str> { |
958 | | // Note: We cannot use for loops or iterators here because they are not const. |
959 | | // Allowing us to use a const function here gives the compiler the ability to |
960 | | // optimize this function away entirely in the case where the input is constant. |
961 | 20 | let chars = s.as_bytes(); |
962 | 20 | let mut i: usize = 0; |
963 | 20 | let len = s.len(); |
964 | | loop { |
965 | 389 | if i >= len { Branch (965:12): [True: 20, False: 369]
Branch (965:12): [Folded - Ignored]
|
966 | 20 | break; |
967 | 369 | } |
968 | 369 | let c = chars[i]; |
969 | 369 | if !c.is_ascii_alphanumeric() && c != b'_'15 { Branch (969:12): [True: 15, False: 354]
Branch (969:42): [True: 0, False: 15]
Branch (969:12): [Folded - Ignored]
Branch (969:42): [Folded - Ignored]
|
970 | 0 | return None; |
971 | 369 | } |
972 | 369 | i += 1; |
973 | | } |
974 | 20 | Some(s) |
975 | 20 | } |
976 | | |
977 | | /// An individual subscription to a key in Redis. |
978 | | #[derive(Debug)] |
979 | | pub struct RedisSubscription { |
980 | | receiver: Option<tokio::sync::watch::Receiver<String>>, |
981 | | weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>, |
982 | | } |
983 | | |
984 | | impl SchedulerSubscription for RedisSubscription { |
985 | | /// Wait for the subscription key to change. |
986 | 18 | async fn changed(&mut self) -> Result<(), Error> { |
987 | 18 | let receiver = self |
988 | 18 | .receiver |
989 | 18 | .as_mut() |
990 | 18 | .ok_or_else(|| make_err!(Code::Internal0 , "In RedisSubscription::changed::as_mut"))?0 ; |
991 | 18 | receiver |
992 | 18 | .changed() |
993 | 18 | .await |
994 | 15 | .map_err(|_| make_err!(Code::Internal0 , "In RedisSubscription::changed::changed")) |
995 | 15 | } |
996 | | } |
997 | | |
998 | | // If the subscription is dropped, we need to possibly remove the key from the |
999 | | // subscribed keys map. |
1000 | | impl Drop for RedisSubscription { |
1001 | 4 | fn drop(&mut self) { |
1002 | 4 | let Some(receiver) = self.receiver.take() else { Branch (1002:13): [True: 4, False: 0]
Branch (1002:13): [Folded - Ignored]
|
1003 | 0 | warn!("RedisSubscription has already been dropped, nothing to do."); |
1004 | 0 | return; // Already dropped, nothing to do. |
1005 | | }; |
1006 | 4 | let key = receiver.borrow().clone(); |
1007 | | // IMPORTANT: This must be dropped before receiver_count() is called. |
1008 | 4 | drop(receiver); |
1009 | 4 | let Some(subscribed_keys2 ) = self.weak_subscribed_keys.upgrade() else { Branch (1009:13): [True: 2, False: 2]
Branch (1009:13): [Folded - Ignored]
|
1010 | 2 | return; // Already dropped, nothing to do. |
1011 | | }; |
1012 | 2 | let mut subscribed_keys = subscribed_keys.write(); |
1013 | 2 | let Some(value) = subscribed_keys.get(&key) else { Branch (1013:13): [True: 2, False: 0]
Branch (1013:13): [Folded - Ignored]
|
1014 | 0 | error!( |
1015 | 0 | "Key {key} was not found in subscribed keys when checking if it should be removed." |
1016 | | ); |
1017 | 0 | return; |
1018 | | }; |
1019 | | // If we have no receivers, cleanup the entry from our map. |
1020 | 2 | if value.receiver_count() == 0 { Branch (1020:12): [True: 2, False: 0]
Branch (1020:12): [Folded - Ignored]
|
1021 | 2 | subscribed_keys.remove(key); |
1022 | 2 | }0 |
1023 | 4 | } |
1024 | | } |
1025 | | |
1026 | | /// A publisher for a key in Redis. |
1027 | | #[derive(Debug)] |
1028 | | struct RedisSubscriptionPublisher { |
1029 | | sender: Mutex<tokio::sync::watch::Sender<String>>, |
1030 | | } |
1031 | | |
1032 | | impl RedisSubscriptionPublisher { |
1033 | 4 | fn new( |
1034 | 4 | key: String, |
1035 | 4 | weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>, |
1036 | 4 | ) -> (Self, RedisSubscription) { |
1037 | 4 | let (sender, receiver) = tokio::sync::watch::channel(key); |
1038 | 4 | let publisher = Self { |
1039 | 4 | sender: Mutex::new(sender), |
1040 | 4 | }; |
1041 | 4 | let subscription = RedisSubscription { |
1042 | 4 | receiver: Some(receiver), |
1043 | 4 | weak_subscribed_keys, |
1044 | 4 | }; |
1045 | 4 | (publisher, subscription) |
1046 | 4 | } |
1047 | | |
1048 | 0 | fn subscribe( |
1049 | 0 | &self, |
1050 | 0 | weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>, |
1051 | 0 | ) -> RedisSubscription { |
1052 | 0 | let receiver = self.sender.lock().subscribe(); |
1053 | 0 | RedisSubscription { |
1054 | 0 | receiver: Some(receiver), |
1055 | 0 | weak_subscribed_keys, |
1056 | 0 | } |
1057 | 0 | } |
1058 | | |
1059 | 2 | fn receiver_count(&self) -> usize { |
1060 | 2 | self.sender.lock().receiver_count() |
1061 | 2 | } |
1062 | | |
1063 | 18 | fn notify(&self) { |
1064 | | // TODO(https://github.com/sile/patricia_tree/issues/40) When this is addressed |
1065 | | // we can remove the `Mutex` and use the mutable iterator directly. |
1066 | 18 | self.sender.lock().send_modify(|_| {}); |
1067 | 18 | } |
1068 | | } |
1069 | | |
1070 | | #[derive(Debug)] |
1071 | | pub struct RedisSubscriptionManager { |
1072 | | subscribed_keys: Arc<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>, |
1073 | | tx_for_test: tokio::sync::mpsc::UnboundedSender<String>, |
1074 | | _subscription_spawn: JoinHandleDropGuard<()>, |
1075 | | } |
1076 | | |
1077 | | impl RedisSubscriptionManager { |
1078 | 3 | pub fn new(subscribe_client: SubscriberClient, pub_sub_channel: String) -> Self { |
1079 | 3 | let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new())); |
1080 | 3 | let subscribed_keys_weak = Arc::downgrade(&subscribed_keys); |
1081 | 3 | let (tx_for_test, mut rx_for_test) = tokio::sync::mpsc::unbounded_channel(); |
1082 | | Self { |
1083 | 3 | subscribed_keys, |
1084 | 3 | tx_for_test, |
1085 | 3 | _subscription_spawn: spawn!("redis_subscribe_spawn", async move { |
1086 | 3 | let mut rx = subscribe_client.message_rx(); |
1087 | | loop { |
1088 | 3 | if let Err(e0 ) = subscribe_client.subscribe(&pub_sub_channel).await { Branch (1088:28): [True: 0, False: 3]
Branch (1088:28): [Folded - Ignored]
|
1089 | 0 | error!("Error subscribing to pattern - {e}"); |
1090 | 0 | return; |
1091 | 3 | } |
1092 | 3 | let mut reconnect_rx = subscribe_client.reconnect_rx(); |
1093 | 3 | let reconnect_fut = reconnect_rx.recv().fuse(); |
1094 | 3 | tokio::pin!(reconnect_fut); |
1095 | | loop { |
1096 | 17 | let key14 = select! { |
1097 | 17 | value14 = rx_for_test.recv() => { |
1098 | 14 | let Some(value) = value else { Branch (1098:37): [True: 14, False: 0]
Branch (1098:37): [Folded - Ignored]
|
1099 | 0 | unreachable!("Channel should never close"); |
1100 | | }; |
1101 | 14 | value.into() |
1102 | | }, |
1103 | 17 | msg0 = rx.recv() => { |
1104 | 0 | match msg { |
1105 | 0 | Ok(msg) => { |
1106 | 0 | if let RedisValue::String(s) = msg.value { Branch (1106:48): [True: 0, False: 0]
Branch (1106:48): [Folded - Ignored]
|
1107 | 0 | s |
1108 | | } else { |
1109 | 0 | error!("Received non-string message in RedisSubscriptionManager"); |
1110 | 0 | continue; |
1111 | | } |
1112 | | }, |
1113 | 0 | Err(e) => { |
1114 | | // Check to see if our parent has been dropped and if so kill spawn. |
1115 | 0 | if subscribed_keys_weak.upgrade().is_none() { Branch (1115:44): [True: 0, False: 0]
Branch (1115:44): [Folded - Ignored]
|
1116 | 0 | warn!("It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"); |
1117 | 0 | return; |
1118 | 0 | } |
1119 | 0 | error!("Error receiving message in RedisSubscriptionManager reconnecting and flagging everything changed - {e}"); |
1120 | 0 | break; |
1121 | | } |
1122 | | } |
1123 | | }, |
1124 | 17 | _ = &mut reconnect_fut => { |
1125 | 0 | warn!("Redis reconnected flagging all subscriptions as changed and resuming"); |
1126 | 0 | break; |
1127 | | } |
1128 | | }; |
1129 | 14 | let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else { Branch (1129:29): [True: 14, False: 0]
Branch (1129:29): [Folded - Ignored]
|
1130 | 0 | warn!( |
1131 | 0 | "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn" |
1132 | | ); |
1133 | 0 | return; |
1134 | | }; |
1135 | 14 | let subscribed_keys_mux = subscribed_keys.read(); |
1136 | 14 | subscribed_keys_mux |
1137 | 14 | .common_prefix_values(&*key) |
1138 | 14 | .for_each(RedisSubscriptionPublisher::notify); |
1139 | | } |
1140 | | // Sleep for a small amount of time to ensure we don't reconnect too quickly. |
1141 | 0 | sleep(Duration::from_secs(1)).await; |
1142 | | // If we reconnect or lag behind we might have had dirty keys, so we need to |
1143 | | // flag all of them as changed. |
1144 | 0 | let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else { Branch (1144:25): [True: 0, False: 0]
Branch (1144:25): [Folded - Ignored]
|
1145 | 0 | warn!( |
1146 | 0 | "It appears our parent has been dropped, exiting RedisSubscriptionManager spawn" |
1147 | | ); |
1148 | 0 | return; |
1149 | | }; |
1150 | 0 | let subscribed_keys_mux = subscribed_keys.read(); |
1151 | | // Just in case also get a new receiver. |
1152 | 0 | rx = subscribe_client.message_rx(); |
1153 | | // Drop all buffered messages, then flag everything as changed. |
1154 | 0 | rx.resubscribe(); |
1155 | 0 | for publisher in subscribed_keys_mux.values() { |
1156 | 0 | publisher.notify(); |
1157 | 0 | } |
1158 | | } |
1159 | 0 | }), |
1160 | | } |
1161 | 3 | } |
1162 | | } |
1163 | | |
1164 | | impl SchedulerSubscriptionManager for RedisSubscriptionManager { |
1165 | | type Subscription = RedisSubscription; |
1166 | | |
1167 | 14 | fn notify_for_test(&self, value: String) { |
1168 | 14 | self.tx_for_test.send(value).unwrap(); |
1169 | 14 | } |
1170 | | |
1171 | 4 | fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error> |
1172 | 4 | where |
1173 | 4 | K: SchedulerStoreKeyProvider, |
1174 | | { |
1175 | 4 | let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys); |
1176 | 4 | let mut subscribed_keys = self.subscribed_keys.write(); |
1177 | 4 | let key = key.get_key(); |
1178 | 4 | let key_str = key.as_str(); |
1179 | 4 | let mut subscription = if let Some(publisher0 ) = subscribed_keys.get(&key_str) { Branch (1179:39): [True: 0, False: 0]
Branch (1179:39): [Folded - Ignored]
Branch (1179:39): [Folded - Ignored]
Branch (1179:39): [True: 0, False: 4]
|
1180 | 0 | publisher.subscribe(weak_subscribed_keys) |
1181 | | } else { |
1182 | 4 | let (publisher, subscription) = |
1183 | 4 | RedisSubscriptionPublisher::new(key_str.to_string(), weak_subscribed_keys); |
1184 | 4 | subscribed_keys.insert(key_str, publisher); |
1185 | 4 | subscription |
1186 | | }; |
1187 | 4 | subscription |
1188 | 4 | .receiver |
1189 | 4 | .as_mut() |
1190 | 4 | .ok_or_else(|| {0 |
1191 | 0 | make_err!( |
1192 | 0 | Code::Internal, |
1193 | | "Receiver should be set in RedisSubscriptionManager::subscribe" |
1194 | | ) |
1195 | 0 | })? |
1196 | 4 | .mark_changed(); |
1197 | | |
1198 | 4 | Ok(subscription) |
1199 | 4 | } |
1200 | | |
1201 | 1 | fn is_reliable() -> bool { |
1202 | 1 | false |
1203 | 1 | } |
1204 | | } |
1205 | | |
1206 | | impl SchedulerStore for RedisStore { |
1207 | | type SubscriptionManager = RedisSubscriptionManager; |
1208 | | |
1209 | 6 | fn subscription_manager(&self) -> Result<Arc<RedisSubscriptionManager>, Error> { |
1210 | 6 | let mut subscription_manager = self.subscription_manager.lock(); |
1211 | | |
1212 | 6 | if let Some(subscription_manager3 ) = &*subscription_manager { Branch (1212:16): [True: 3, False: 3]
Branch (1212:16): [Folded - Ignored]
|
1213 | 3 | Ok(subscription_manager.clone()) |
1214 | | } else { |
1215 | 3 | let Some(pub_sub_channel) = &self.pub_sub_channel else { Branch (1215:17): [True: 3, False: 0]
Branch (1215:17): [Folded - Ignored]
|
1216 | 0 | return Err(make_input_err!( |
1217 | 0 | "RedisStore must have a pubsub channel for a Redis Scheduler if using subscriptions" |
1218 | 0 | )); |
1219 | | }; |
1220 | 3 | let sub = Arc::new(RedisSubscriptionManager::new( |
1221 | 3 | self.subscriber_client.clone(), |
1222 | 3 | pub_sub_channel.clone(), |
1223 | | )); |
1224 | 3 | *subscription_manager = Some(sub.clone()); |
1225 | 3 | Ok(sub) |
1226 | | } |
1227 | 6 | } |
1228 | | |
1229 | 16 | async fn update_data<T>(&self, data: T) -> Result<Option<i64>, Error> |
1230 | 16 | where |
1231 | 16 | T: SchedulerStoreDataProvider |
1232 | 16 | + SchedulerStoreKeyProvider |
1233 | 16 | + SchedulerCurrentVersionProvider |
1234 | 16 | + Send, |
1235 | 16 | { |
1236 | 16 | let key = data.get_key(); |
1237 | 16 | let redis_key = self.encode_key(&key); |
1238 | 16 | let client = self.get_client().await?0 ; |
1239 | 16 | let maybe_index = data.get_indexes().err_tip(|| {0 |
1240 | 0 | format!("Err getting index in RedisStore::update_data::versioned for {redis_key}") |
1241 | 0 | })?; |
1242 | 16 | if <T as SchedulerStoreKeyProvider>::Versioned::VALUE { Branch (1242:12): [True: 0, False: 0]
Branch (1242:12): [True: 0, False: 0]
Branch (1242:12): [True: 0, False: 0]
Branch (1242:12): [Folded - Ignored]
Branch (1242:12): [Folded - Ignored]
Branch (1242:12): [True: 0, False: 3]
Branch (1242:12): [True: 0, False: 0]
Branch (1242:12): [True: 13, False: 0]
|
1243 | 13 | let current_version = data.current_version(); |
1244 | 13 | let data = data.try_into_bytes().err_tip(|| {0 |
1245 | 0 | format!("Could not convert value to bytes in RedisStore::update_data::versioned for {redis_key}") |
1246 | 0 | })?; |
1247 | 13 | let mut argv = Vec::with_capacity(3 + maybe_index.len() * 2); |
1248 | 13 | argv.push(Bytes::from(format!("{current_version}"))); |
1249 | 13 | argv.push(data); |
1250 | 52 | for (name39 , value39 ) in maybe_index { |
1251 | 39 | argv.push(Bytes::from_static(name.as_bytes())); |
1252 | 39 | argv.push(value); |
1253 | 39 | } |
1254 | 13 | let start = std::time::Instant::now(); |
1255 | | |
1256 | 13 | let (success, new_version): (bool, i64) = self |
1257 | 13 | .update_if_version_matches_script |
1258 | 13 | .evalsha_with_reload(&client.client, vec![redis_key.as_ref()], argv) |
1259 | 13 | .await |
1260 | 13 | .err_tip(|| format!("In RedisStore::update_data::versioned for {key:?}"0 ))?0 ; |
1261 | | |
1262 | 13 | let elapsed = start.elapsed(); |
1263 | | |
1264 | 13 | if elapsed > Duration::from_millis(100) { Branch (1264:16): [True: 0, False: 0]
Branch (1264:16): [True: 0, False: 0]
Branch (1264:16): [True: 0, False: 0]
Branch (1264:16): [Folded - Ignored]
Branch (1264:16): [Folded - Ignored]
Branch (1264:16): [True: 0, False: 0]
Branch (1264:16): [True: 0, False: 0]
Branch (1264:16): [True: 0, False: 13]
|
1265 | 0 | warn!( |
1266 | | %redis_key, |
1267 | | ?elapsed, |
1268 | 0 | "Slow Redis version-set operation" |
1269 | | ); |
1270 | 13 | } |
1271 | 13 | if !success { Branch (1271:16): [True: 0, False: 0]
Branch (1271:16): [True: 0, False: 0]
Branch (1271:16): [True: 0, False: 0]
Branch (1271:16): [Folded - Ignored]
Branch (1271:16): [Folded - Ignored]
Branch (1271:16): [True: 0, False: 0]
Branch (1271:16): [True: 0, False: 0]
Branch (1271:16): [True: 1, False: 12]
|
1272 | 1 | warn!( |
1273 | | %redis_key, |
1274 | | %key, |
1275 | | %current_version, |
1276 | | %new_version, |
1277 | 1 | caller = core::any::type_name::<T>(), |
1278 | 1 | "Redis version conflict - optimistic lock failed" |
1279 | | ); |
1280 | 1 | return Ok(None); |
1281 | 12 | } |
1282 | 12 | trace!( |
1283 | | %redis_key, |
1284 | | %key, |
1285 | | old_version = %current_version, |
1286 | | %new_version, |
1287 | 12 | "Updated redis key to new version" |
1288 | | ); |
1289 | | // If we have a publish channel configured, send a notice that the key has been set. |
1290 | 12 | if let Some(pub_sub_channel) = &self.pub_sub_channel { Branch (1290:20): [True: 0, False: 0]
Branch (1290:20): [True: 0, False: 0]
Branch (1290:20): [True: 0, False: 0]
Branch (1290:20): [Folded - Ignored]
Branch (1290:20): [Folded - Ignored]
Branch (1290:20): [True: 0, False: 0]
Branch (1290:20): [True: 0, False: 0]
Branch (1290:20): [True: 12, False: 0]
|
1291 | 12 | return Ok(client |
1292 | 12 | .client |
1293 | 12 | .publish(pub_sub_channel, redis_key.as_ref()) |
1294 | 12 | .await?0 ); |
1295 | 0 | } |
1296 | 0 | Ok(Some(new_version)) |
1297 | | } else { |
1298 | 3 | let data = data.try_into_bytes().err_tip(|| {0 |
1299 | 0 | format!("Could not convert value to bytes in RedisStore::update_data::noversion for {redis_key}") |
1300 | 0 | })?; |
1301 | 3 | let mut fields = RedisMap::new(); |
1302 | 3 | fields.reserve(1 + maybe_index.len()); |
1303 | 3 | fields.insert(DATA_FIELD_NAME.into(), data.into()); |
1304 | 3 | for (name0 , value0 ) in maybe_index { |
1305 | 0 | fields.insert(name.into(), value.into()); |
1306 | 0 | } |
1307 | 3 | client |
1308 | 3 | .client |
1309 | 3 | .hset::<(), _, _>(redis_key.as_ref(), fields) |
1310 | 3 | .await |
1311 | 3 | .err_tip(|| format!("In RedisStore::update_data::noversion for {redis_key}"0 ))?0 ; |
1312 | | // If we have a publish channel configured, send a notice that the key has been set. |
1313 | 3 | if let Some(pub_sub_channel) = &self.pub_sub_channel { Branch (1313:20): [True: 0, False: 0]
Branch (1313:20): [True: 0, False: 0]
Branch (1313:20): [True: 0, False: 0]
Branch (1313:20): [Folded - Ignored]
Branch (1313:20): [Folded - Ignored]
Branch (1313:20): [True: 3, False: 0]
Branch (1313:20): [True: 0, False: 0]
Branch (1313:20): [True: 0, False: 0]
|
1314 | 3 | return Ok(client |
1315 | 3 | .client |
1316 | 3 | .publish(pub_sub_channel, redis_key.as_ref()) |
1317 | 3 | .await?0 ); |
1318 | 0 | } |
1319 | 0 | Ok(Some(0)) // Always use "0" version since this is not a versioned request. |
1320 | | } |
1321 | 16 | } |
1322 | | |
1323 | 20 | async fn search_by_index_prefix<K>( |
1324 | 20 | &self, |
1325 | 20 | index: K, |
1326 | 20 | ) -> Result< |
1327 | 20 | impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send, |
1328 | 20 | Error, |
1329 | 20 | > |
1330 | 20 | where |
1331 | 20 | K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send, |
1332 | 20 | { |
1333 | 20 | let index_value = index.index_value(); |
1334 | 20 | let sanitized_field = try_sanitize(index_value.as_ref()) |
1335 | 20 | .err_tip(|| {0 |
1336 | 0 | format!("In RedisStore::search_by_index_prefix::try_sanitize - {index_value:?}") |
1337 | 0 | })? |
1338 | 20 | .to_string(); |
1339 | 20 | let index_name = format!( |
1340 | 20 | "{}", |
1341 | 20 | get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY) |
1342 | | ); |
1343 | | |
1344 | 20 | let run_ft_aggregate = |client: Arc<ClientWithPermit>, |
1345 | | index_name: String, |
1346 | 20 | sanitized_field: String| async move { |
1347 | 20 | ft_aggregate( |
1348 | 20 | client.client.clone(), |
1349 | 20 | index_name, |
1350 | 20 | if sanitized_field.is_empty() { Branch (1350:20): [True: 0, False: 0]
Branch (1350:20): [True: 0, False: 0]
Branch (1350:20): [Folded - Ignored]
Branch (1350:20): [Folded - Ignored]
Branch (1350:20): [True: 0, False: 17]
Branch (1350:20): [True: 0, False: 3]
|
1351 | 0 | "*".to_string() |
1352 | | } else { |
1353 | 20 | format!("@{}:{{ {} }}", K::INDEX_NAME, sanitized_field) |
1354 | | }, |
1355 | | FtAggregateOptions { |
1356 | 20 | load: Some(Load::Some(vec![ |
1357 | 20 | SearchField { |
1358 | 20 | identifier: DATA_FIELD_NAME.into(), |
1359 | 20 | property: None, |
1360 | 20 | }, |
1361 | 20 | SearchField { |
1362 | 20 | identifier: VERSION_FIELD_NAME.into(), |
1363 | 20 | property: None, |
1364 | 20 | }, |
1365 | 20 | ])), |
1366 | 20 | cursor: Some(WithCursor { |
1367 | 20 | count: Some(MAX_COUNT_PER_CURSOR), |
1368 | 20 | max_idle: Some(CURSOR_IDLE_MS), |
1369 | 20 | }), |
1370 | 20 | pipeline: vec![AggregateOperation::SortBy { |
1371 | 20 | properties: K::MAYBE_SORT_KEY.map_or_else(Vec::new, |v| {17 |
1372 | 17 | vec![(format!("@{v}").into(), SortOrder::Asc)] |
1373 | 17 | }), |
1374 | 20 | max: None, |
1375 | | }], |
1376 | 20 | ..Default::default() |
1377 | | }, |
1378 | | ) |
1379 | 20 | .await |
1380 | 20 | .map(|stream| (stream, client)) |
1381 | 40 | }; |
1382 | | |
1383 | 20 | let client = Arc::new(self.get_client().await?0 ); |
1384 | 20 | let (stream, client_guard) = if let Ok(result) = Branch (1384:45): [True: 0, False: 0]
Branch (1384:45): [True: 0, False: 0]
Branch (1384:45): [Folded - Ignored]
Branch (1384:45): [Folded - Ignored]
Branch (1384:45): [True: 17, False: 0]
Branch (1384:45): [True: 3, False: 0]
|
1385 | 20 | run_ft_aggregate(client.clone(), index_name.clone(), sanitized_field.clone()).await |
1386 | | { |
1387 | 20 | result |
1388 | | } else { |
1389 | 0 | let mut schema = vec![SearchSchema { |
1390 | 0 | field_name: K::INDEX_NAME.into(), |
1391 | 0 | alias: None, |
1392 | 0 | kind: SearchSchemaKind::Tag { |
1393 | 0 | sortable: false, |
1394 | 0 | unf: false, |
1395 | 0 | separator: None, |
1396 | 0 | casesensitive: false, |
1397 | 0 | withsuffixtrie: false, |
1398 | 0 | noindex: false, |
1399 | 0 | }, |
1400 | 0 | }]; |
1401 | 0 | if let Some(sort_key) = K::MAYBE_SORT_KEY { Branch (1401:20): [True: 0, False: 0]
Branch (1401:20): [True: 0, False: 0]
Branch (1401:20): [Folded - Ignored]
Branch (1401:20): [Folded - Ignored]
Branch (1401:20): [True: 0, False: 0]
Branch (1401:20): [True: 0, False: 0]
|
1402 | 0 | schema.push(SearchSchema { |
1403 | 0 | field_name: sort_key.into(), |
1404 | 0 | alias: None, |
1405 | 0 | kind: SearchSchemaKind::Tag { |
1406 | 0 | sortable: true, |
1407 | 0 | unf: false, |
1408 | 0 | separator: None, |
1409 | 0 | casesensitive: false, |
1410 | 0 | withsuffixtrie: false, |
1411 | 0 | noindex: false, |
1412 | 0 | }, |
1413 | 0 | }); |
1414 | 0 | } |
1415 | 0 | let create_result: Result<(), Error> = { |
1416 | 0 | let create_client = self.get_client().await?; |
1417 | 0 | create_client |
1418 | 0 | .client |
1419 | 0 | .ft_create::<(), _>( |
1420 | 0 | index_name.clone(), |
1421 | 0 | FtCreateOptions { |
1422 | 0 | on: Some(IndexKind::Hash), |
1423 | 0 | prefixes: vec![K::KEY_PREFIX.into()], |
1424 | 0 | nohl: true, |
1425 | 0 | nofields: true, |
1426 | 0 | nofreqs: true, |
1427 | 0 | nooffsets: true, |
1428 | 0 | temporary: Some(INDEX_TTL_S), |
1429 | 0 | ..Default::default() |
1430 | 0 | }, |
1431 | 0 | schema, |
1432 | 0 | ) |
1433 | 0 | .await |
1434 | 0 | .err_tip(|| { |
1435 | 0 | format!( |
1436 | 0 | "Error with ft_create in RedisStore::search_by_index_prefix({})", |
1437 | 0 | get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY), |
1438 | | ) |
1439 | 0 | })?; |
1440 | 0 | Ok(()) |
1441 | | }; |
1442 | 0 | let retry_client = Arc::new(self.get_client().await?); |
1443 | 0 | let retry_result = |
1444 | 0 | run_ft_aggregate(retry_client, index_name.clone(), sanitized_field.clone()).await; |
1445 | 0 | if let Ok(result) = retry_result { Branch (1445:20): [True: 0, False: 0]
Branch (1445:20): [True: 0, False: 0]
Branch (1445:20): [Folded - Ignored]
Branch (1445:20): [Folded - Ignored]
Branch (1445:20): [True: 0, False: 0]
Branch (1445:20): [True: 0, False: 0]
|
1446 | 0 | result |
1447 | | } else { |
1448 | 0 | let e: Error = retry_result |
1449 | 0 | .err() |
1450 | 0 | .expect("Checked for Ok result above") |
1451 | 0 | .into(); |
1452 | 0 | let err = match create_result { |
1453 | 0 | Ok(()) => e, |
1454 | 0 | Err(create_err) => create_err.merge(e), |
1455 | | }; |
1456 | 0 | return Err(err); |
1457 | | } |
1458 | | }; |
1459 | | |
1460 | 20 | Ok(stream.map(move |result| {7 |
1461 | 7 | let keep_alive = client_guard.clone(); |
1462 | 7 | let _ = &keep_alive; |
1463 | 7 | let mut redis_map = |
1464 | 7 | result.err_tip(|| "Error in stream of in RedisStore::search_by_index_prefix")?0 ; |
1465 | 7 | let bytes_data = redis_map |
1466 | 7 | .remove(&RedisKey::from_static_str(DATA_FIELD_NAME)) |
1467 | 7 | .err_tip(|| "Missing data field in RedisStore::search_by_index_prefix")?0 |
1468 | 7 | .into_bytes() |
1469 | 7 | .err_tip(|| {0 |
1470 | 0 | formatcp!("'{DATA_FIELD_NAME}' is not Bytes in RedisStore::search_by_index_prefix::into_bytes") |
1471 | 0 | })?; |
1472 | 7 | let version = if <K as SchedulerIndexProvider>::Versioned::VALUE { Branch (1472:30): [True: 0, False: 0]
Branch (1472:30): [True: 0, False: 0]
Branch (1472:30): [Folded - Ignored]
Branch (1472:30): [Folded - Ignored]
Branch (1472:30): [True: 5, False: 0]
Branch (1472:30): [True: 2, False: 0]
|
1473 | 7 | redis_map |
1474 | 7 | .remove(&RedisKey::from_static_str(VERSION_FIELD_NAME)) |
1475 | 7 | .err_tip(|| "Missing version field in RedisStore::search_by_index_prefix")?0 |
1476 | 7 | .as_i64() |
1477 | 7 | .err_tip(|| {0 |
1478 | 0 | formatcp!("'{VERSION_FIELD_NAME}' is not u64 in RedisStore::search_by_index_prefix::as_u64") |
1479 | 0 | })? |
1480 | | } else { |
1481 | 0 | 0 |
1482 | | }; |
1483 | 7 | K::decode(version, bytes_data) |
1484 | 7 | .err_tip(|| "In RedisStore::search_by_index_prefix::decode") |
1485 | 7 | })) |
1486 | 20 | } |
1487 | | |
1488 | 60 | async fn get_and_decode<K>( |
1489 | 60 | &self, |
1490 | 60 | key: K, |
1491 | 60 | ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error> |
1492 | 60 | where |
1493 | 60 | K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send, |
1494 | 60 | { |
1495 | 60 | let key = key.get_key(); |
1496 | 60 | let key = self.encode_key(&key); |
1497 | 60 | let client = self.get_client().await?0 ; |
1498 | 60 | let (maybe_version, maybe_data) = client |
1499 | 60 | .client |
1500 | 60 | .hmget::<(Option<i64>, Option<Bytes>), _, _>( |
1501 | 60 | key.as_ref(), |
1502 | 60 | vec![ |
1503 | 60 | RedisKey::from(VERSION_FIELD_NAME), |
1504 | 60 | RedisKey::from(DATA_FIELD_NAME), |
1505 | 60 | ], |
1506 | 60 | ) |
1507 | 60 | .await |
1508 | 60 | .err_tip(|| format!("In RedisStore::get_without_version::notversioned {key}"0 ))?0 ; |
1509 | 60 | let Some(data31 ) = maybe_data else { Branch (1509:13): [True: 0, False: 0]
Branch (1509:13): [True: 0, False: 0]
Branch (1509:13): [True: 0, False: 0]
Branch (1509:13): [Folded - Ignored]
Branch (1509:13): [Folded - Ignored]
Branch (1509:13): [True: 2, False: 0]
Branch (1509:13): [True: 0, False: 28]
Branch (1509:13): [True: 29, False: 1]
|
1510 | 29 | return Ok(None); |
1511 | | }; |
1512 | 31 | Ok(Some(K::decode(maybe_version.unwrap_or(0), data).err_tip( |
1513 | 0 | || format!("In RedisStore::get_with_version::notversioned::decode {key}"), |
1514 | 0 | )?)) |
1515 | 60 | } |
1516 | | } |