/build/source/src/bin/redis_store_tester.rs
Line | Count | Source |
1 | | use core::sync::atomic::{AtomicUsize, Ordering}; |
2 | | use core::time::Duration; |
3 | | use std::borrow::Cow; |
4 | | use std::env; |
5 | | use std::sync::{Arc, RwLock}; |
6 | | |
7 | | use bytes::Bytes; |
8 | | use clap::{Parser, ValueEnum}; |
9 | | use futures::TryStreamExt; |
10 | | use nativelink_config::stores::{RedisMode, RedisSpec}; |
11 | | use nativelink_error::{Code, Error, ResultExt}; |
12 | | use nativelink_store::redis_store::RedisStore; |
13 | | use nativelink_util::buf_channel::make_buf_channel_pair; |
14 | | use nativelink_util::store_trait::{ |
15 | | SchedulerCurrentVersionProvider, SchedulerIndexProvider, SchedulerStore, |
16 | | SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider, StoreDriver, |
17 | | StoreKey, StoreLike, TrueValue, UploadSizeInfo, |
18 | | }; |
19 | | use nativelink_util::telemetry::init_tracing; |
20 | | use nativelink_util::{background_spawn, spawn}; |
21 | | use rand::Rng; |
22 | | use tokio::time::sleep; |
23 | | use tracing::{error, info}; |
24 | | |
25 | | // Define test structures that implement the scheduler traits |
26 | | #[derive(Debug, Clone, PartialEq)] |
27 | | struct TestSchedulerData { |
28 | | key: String, |
29 | | content: String, |
30 | | version: i64, |
31 | | } |
32 | | |
33 | | #[derive(Debug)] |
34 | | struct TestSchedulerReturn { |
35 | | version: i64, |
36 | | } |
37 | | |
38 | | impl SchedulerStoreKeyProvider for TestSchedulerData { |
39 | | type Versioned = TrueValue; // Using versioned storage |
40 | | |
41 | 0 | fn get_key(&self) -> StoreKey<'static> { |
42 | 0 | StoreKey::Str(Cow::Owned(self.key.clone())) |
43 | 0 | } |
44 | | } |
45 | | |
46 | | impl SchedulerStoreDataProvider for TestSchedulerData { |
47 | 0 | fn try_into_bytes(self) -> Result<Bytes, Error> { |
48 | 0 | Ok(Bytes::from(self.content.into_bytes())) |
49 | 0 | } |
50 | | |
51 | 0 | fn get_indexes(&self) -> Result<Vec<(&'static str, Bytes)>, Error> { |
52 | | // Add some test indexes - need to use 'static strings |
53 | 0 | Ok(vec![ |
54 | 0 | ("test_index", Bytes::from("test_value")), |
55 | 0 | ( |
56 | 0 | "content_prefix", |
57 | 0 | Bytes::from(self.content.chars().take(10).collect::<String>()), |
58 | 0 | ), |
59 | 0 | ]) |
60 | 0 | } |
61 | | } |
62 | | |
63 | | impl SchedulerStoreDecodeTo for TestSchedulerData { |
64 | | type DecodeOutput = TestSchedulerReturn; |
65 | | |
66 | 0 | fn decode(version: i64, _data: Bytes) -> Result<Self::DecodeOutput, Error> { |
67 | 0 | Ok(TestSchedulerReturn { version }) |
68 | 0 | } |
69 | | } |
70 | | |
71 | | impl SchedulerCurrentVersionProvider for TestSchedulerData { |
72 | 0 | fn current_version(&self) -> i64 { |
73 | 0 | self.version |
74 | 0 | } |
75 | | } |
76 | | |
77 | | struct SearchByContentPrefix { |
78 | | prefix: String, |
79 | | } |
80 | | |
81 | | impl SchedulerIndexProvider for SearchByContentPrefix { |
82 | | const KEY_PREFIX: &'static str = "test:"; |
83 | | const INDEX_NAME: &'static str = "content_prefix"; |
84 | | type Versioned = TrueValue; |
85 | | |
86 | 0 | fn index_value(&self) -> Cow<'_, str> { |
87 | 0 | Cow::Borrowed(&self.prefix) |
88 | 0 | } |
89 | | } |
90 | | |
91 | | impl SchedulerStoreKeyProvider for SearchByContentPrefix { |
92 | | type Versioned = TrueValue; |
93 | | |
94 | 0 | fn get_key(&self) -> StoreKey<'static> { |
95 | 0 | StoreKey::Str(Cow::Owned("dummy_key".to_string())) |
96 | 0 | } |
97 | | } |
98 | | |
99 | | impl SchedulerStoreDecodeTo for SearchByContentPrefix { |
100 | | type DecodeOutput = TestSchedulerReturn; |
101 | | |
102 | 0 | fn decode(version: i64, data: Bytes) -> Result<Self::DecodeOutput, Error> { |
103 | 0 | TestSchedulerData::decode(version, data) |
104 | 0 | } |
105 | | } |
106 | | |
107 | | const MAX_KEY: u16 = 1024; |
108 | | |
109 | | /// Wrapper type for CLI parsing since we can't implement foreign traits on foreign types. |
110 | | #[derive(Debug, Default, Clone, Copy, PartialEq, Eq, ValueEnum)] |
111 | | enum RedisModeArg { |
112 | | Cluster, |
113 | | Sentinel, |
114 | | #[default] |
115 | | Standard, |
116 | | } |
117 | | |
118 | | impl From<RedisModeArg> for RedisMode { |
119 | 0 | fn from(arg: RedisModeArg) -> Self { |
120 | 0 | match arg { |
121 | 0 | RedisModeArg::Standard => Self::Standard, |
122 | 0 | RedisModeArg::Sentinel => Self::Sentinel, |
123 | 0 | RedisModeArg::Cluster => Self::Cluster, |
124 | | } |
125 | 0 | } |
126 | | } |
127 | | |
128 | 0 | fn random_key() -> StoreKey<'static> { |
129 | 0 | let key = rand::rng().random_range(0..MAX_KEY); |
130 | 0 | StoreKey::new_str(&key.to_string()).into_owned() |
131 | 0 | } |
132 | | |
133 | | #[derive(Debug, Default, Clone, Copy, PartialEq, Eq, ValueEnum)] |
134 | | enum TestMode { |
135 | | #[default] |
136 | | Random, |
137 | | Sequential, |
138 | | } |
139 | | |
140 | | #[derive(Parser, Debug)] |
141 | | #[command(version, about)] |
142 | | struct Args { |
143 | | #[arg(value_enum, short, long, default_value_t)] |
144 | | redis_mode: RedisModeArg, |
145 | | |
146 | | #[arg(value_enum, short, long, default_value_t)] |
147 | | mode: TestMode, |
148 | | } |
149 | | |
150 | 0 | async fn run<S: StoreDriver + SchedulerStore>( |
151 | 0 | store: Arc<S>, |
152 | 0 | max_loops: usize, |
153 | 0 | failed: Arc<RwLock<bool>>, |
154 | 0 | mode: TestMode, |
155 | 0 | ) -> Result<(), Error> { |
156 | 0 | let mut count = 0; |
157 | 0 | let in_flight = Arc::new(AtomicUsize::new(0)); |
158 | | |
159 | | loop { |
160 | 0 | if count % 1000 == 0 { Branch (160:12): [Folded - Ignored]
|
161 | 0 | info!( |
162 | 0 | "Loop count {count}. In flight: {}", |
163 | 0 | in_flight.load(Ordering::Relaxed) |
164 | | ); |
165 | 0 | if *failed.read().unwrap() { Branch (165:16): [Folded - Ignored]
|
166 | 0 | return Err(Error::new( |
167 | 0 | Code::Internal, |
168 | 0 | "Failed in redis_store_tester".to_string(), |
169 | 0 | )); |
170 | 0 | } |
171 | 0 | } |
172 | 0 | if count == max_loops { Branch (172:12): [Folded - Ignored]
|
173 | | loop { |
174 | 0 | let remaining = in_flight.load(Ordering::Relaxed); |
175 | 0 | if remaining == 0 { Branch (175:20): [Folded - Ignored]
|
176 | 0 | return Ok(()); |
177 | 0 | } |
178 | 0 | info!(remaining, "Remaining"); |
179 | 0 | sleep(Duration::from_secs(1)).await; |
180 | | } |
181 | 0 | } |
182 | 0 | count += 1; |
183 | 0 | in_flight.fetch_add(1, Ordering::Relaxed); |
184 | | |
185 | 0 | let store_clone = store.clone(); |
186 | 0 | let local_fail = failed.clone(); |
187 | 0 | let local_in_flight = in_flight.clone(); |
188 | | |
189 | 0 | let max_action_value = 7; |
190 | 0 | let action_value = match mode { |
191 | 0 | TestMode::Random => rand::rng().random_range(0..max_action_value), |
192 | 0 | TestMode::Sequential => count % max_action_value, |
193 | | }; |
194 | | |
195 | 0 | background_spawn!("action", async move { |
196 | 0 | async fn run_action<S: StoreDriver + SchedulerStore>( |
197 | 0 | action_value: usize, |
198 | 0 | store_clone: Arc<S>, |
199 | 0 | ) -> Result<(), Error> { |
200 | 0 | match action_value { |
201 | | 0 => { |
202 | 0 | store_clone.has(random_key()).await?; |
203 | | } |
204 | | 1 => { |
205 | 0 | let (mut tx, rx) = make_buf_channel_pair(); |
206 | 0 | tx.send(Bytes::from_static(b"12345")).await?; |
207 | 0 | tx.send_eof()?; |
208 | 0 | store_clone |
209 | 0 | .update(random_key(), rx, UploadSizeInfo::ExactSize(5)) |
210 | 0 | .await?; |
211 | | } |
212 | | 2 => { |
213 | 0 | let mut results = (0..MAX_KEY).map(|_| None).collect::<Vec<_>>(); |
214 | | |
215 | 0 | store_clone |
216 | 0 | .has_with_results( |
217 | 0 | &(0..MAX_KEY) |
218 | 0 | .map(|i| StoreKey::Str(Cow::Owned(i.to_string()))) |
219 | 0 | .collect::<Vec<_>>(), |
220 | 0 | &mut results, |
221 | | ) |
222 | 0 | .await?; |
223 | | } |
224 | | 3 => { |
225 | 0 | store_clone |
226 | 0 | .update_oneshot(random_key(), Bytes::from_static(b"1234")) |
227 | 0 | .await?; |
228 | | } |
229 | | 4 => { |
230 | 0 | let res = store_clone |
231 | 0 | .list(.., |_key| true) |
232 | 0 | .await |
233 | 0 | .err_tip(|| "In list")?; |
234 | 0 | info!(%res, "end list"); |
235 | | } |
236 | | 5 => { |
237 | 0 | let search_provider = SearchByContentPrefix { |
238 | 0 | prefix: "Searchable".to_string(), |
239 | 0 | }; |
240 | 0 | for i in 0..5 { |
241 | 0 | let data = TestSchedulerData { |
242 | 0 | key: format!("test:search_key_{i}"), |
243 | 0 | content: format!("Searchable content #{i}"), |
244 | 0 | version: 0, |
245 | 0 | }; |
246 | | |
247 | 0 | store_clone.update_data(data).await?; |
248 | | } |
249 | 0 | let search_results: Vec<_> = store_clone |
250 | 0 | .search_by_index_prefix(search_provider) |
251 | 0 | .await? |
252 | 0 | .try_collect() |
253 | 0 | .await?; |
254 | 0 | info!(?search_results, "search results"); |
255 | | } |
256 | | _ => { |
257 | 0 | let mut data = TestSchedulerData { |
258 | 0 | key: "test:scheduler_key_1".to_string(), |
259 | 0 | content: "Test scheduler data #1".to_string(), |
260 | 0 | version: 0, |
261 | 0 | }; |
262 | | |
263 | 0 | let res = store_clone.get_and_decode(data.clone()).await?; |
264 | 0 | if let Some(existing_data) = res { Branch (264:32): [Folded - Ignored]
|
265 | 0 | data.version = existing_data.version + 1; |
266 | 0 | } |
267 | | |
268 | 0 | store_clone.update_data(data).await?; |
269 | | } |
270 | | } |
271 | 0 | Ok(()) |
272 | 0 | } |
273 | 0 | match run_action(action_value, store_clone).await { |
274 | 0 | Ok(()) => {} |
275 | 0 | Err(e) => { |
276 | 0 | error!(?e, "Error!"); |
277 | 0 | *local_fail.write().unwrap() = true; |
278 | | } |
279 | | } |
280 | 0 | local_in_flight.fetch_sub(1, Ordering::Relaxed); |
281 | 0 | }); |
282 | | } |
283 | 0 | } |
284 | | |
285 | 0 | fn main() -> Result<(), Box<dyn core::error::Error>> { |
286 | 0 | let args = Args::parse(); |
287 | 0 | let redis_mode: RedisMode = args.redis_mode.into(); |
288 | | |
289 | 0 | let failed = Arc::new(RwLock::new(false)); |
290 | 0 | let redis_host = env::var("REDIS_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); |
291 | 0 | let max_client_permits = env::var("MAX_REDIS_PERMITS") |
292 | 0 | .unwrap_or_else(|_| "100".to_string()) |
293 | 0 | .parse()?; |
294 | 0 | let max_loops: usize = env::var("MAX_LOOPS") |
295 | 0 | .unwrap_or_else(|_| "2000000".to_string()) |
296 | 0 | .parse()?; |
297 | | |
298 | | #[expect( |
299 | | clippy::disallowed_methods, |
300 | | reason = "`We need `tokio::runtime::Runtime::block_on` so we can get errors _after_ threads finished" |
301 | | )] |
302 | 0 | tokio::runtime::Builder::new_multi_thread() |
303 | 0 | .enable_all() |
304 | 0 | .build() |
305 | 0 | .unwrap() |
306 | 0 | .block_on(async { |
307 | | // The OTLP exporters need to run in a Tokio context. |
308 | 0 | spawn!("init tracing", async { init_tracing() }) |
309 | 0 | .await? |
310 | 0 | .expect("Init tracing should work"); |
311 | | |
312 | 0 | let redis_port = match redis_mode { |
313 | 0 | RedisMode::Standard => 6379, |
314 | 0 | RedisMode::Sentinel => 26379, |
315 | 0 | RedisMode::Cluster => 7000, |
316 | | }; |
317 | 0 | let addr = match redis_mode { |
318 | 0 | RedisMode::Sentinel => format!("redis+sentinel://{redis_host}:{redis_port}/"), |
319 | 0 | _ => format!("redis://{redis_host}:{redis_port}/"), |
320 | | }; |
321 | 0 | let spec = RedisSpec { |
322 | 0 | addresses: vec![addr], |
323 | 0 | connection_timeout_ms: 1000, |
324 | 0 | max_client_permits, |
325 | 0 | mode: redis_mode, |
326 | 0 | ..Default::default() |
327 | 0 | }; |
328 | 0 | match spec.mode { |
329 | | RedisMode::Standard | RedisMode::Sentinel => { |
330 | 0 | let store = RedisStore::new_standard(spec).await?; |
331 | 0 | run(store, max_loops, failed.clone(), args.mode).await |
332 | | } |
333 | | RedisMode::Cluster => { |
334 | 0 | let store = RedisStore::new_cluster(spec).await?; |
335 | 0 | run(store, max_loops, failed.clone(), args.mode).await |
336 | | } |
337 | | } |
338 | 0 | }) |
339 | 0 | .unwrap(); |
340 | 0 | if *failed.read().unwrap() { Branch (340:8): [Folded - Ignored]
|
341 | 0 | return Err(Error::new(Code::Internal, "Failed in redis_store_tester".to_string()).into()); |
342 | 0 | } |
343 | 0 | Ok(()) |
344 | 0 | } |