/build/source/src/bin/redis_store_tester.rs
Line | Count | Source |
1 | | use core::sync::atomic::{AtomicUsize, Ordering}; |
2 | | use std::borrow::Cow; |
3 | | use std::env; |
4 | | use std::sync::{Arc, RwLock}; |
5 | | |
6 | | use bytes::Bytes; |
7 | | use nativelink_config::stores::RedisSpec; |
8 | | use nativelink_error::{Code, Error}; |
9 | | use nativelink_store::redis_store::RedisStore; |
10 | | use nativelink_util::buf_channel::make_buf_channel_pair; |
11 | | use nativelink_util::store_trait::{ |
12 | | SchedulerCurrentVersionProvider, SchedulerStore, SchedulerStoreDataProvider, |
13 | | SchedulerStoreDecodeTo, SchedulerStoreKeyProvider, StoreKey, StoreLike, TrueValue, |
14 | | UploadSizeInfo, |
15 | | }; |
16 | | use nativelink_util::telemetry::init_tracing; |
17 | | use nativelink_util::{background_spawn, spawn}; |
18 | | use rand::Rng; |
19 | | use tracing::{error, info}; |
20 | | |
21 | | // Define test structures that implement the scheduler traits |
22 | | #[derive(Debug, Clone, PartialEq)] |
23 | | struct TestSchedulerData { |
24 | | key: String, |
25 | | content: String, |
26 | | version: i64, |
27 | | } |
28 | | |
29 | | struct TestSchedulerReturn { |
30 | | version: i64, |
31 | | } |
32 | | |
33 | | impl SchedulerStoreKeyProvider for TestSchedulerData { |
34 | | type Versioned = TrueValue; // Using versioned storage |
35 | | |
36 | 0 | fn get_key(&self) -> StoreKey<'static> { |
37 | 0 | StoreKey::Str(Cow::Owned(self.key.clone())) |
38 | 0 | } |
39 | | } |
40 | | |
41 | | impl SchedulerStoreDataProvider for TestSchedulerData { |
42 | 0 | fn try_into_bytes(self) -> Result<Bytes, Error> { |
43 | 0 | Ok(Bytes::from(self.content.into_bytes())) |
44 | 0 | } |
45 | | |
46 | 0 | fn get_indexes(&self) -> Result<Vec<(&'static str, Bytes)>, Error> { |
47 | | // Add some test indexes - need to use 'static strings |
48 | 0 | Ok(vec![ |
49 | 0 | ("test_index", Bytes::from("test_value")), |
50 | 0 | ( |
51 | 0 | "content_prefix", |
52 | 0 | Bytes::from(self.content.chars().take(10).collect::<String>()), |
53 | 0 | ), |
54 | 0 | ]) |
55 | 0 | } |
56 | | } |
57 | | |
58 | | impl SchedulerStoreDecodeTo for TestSchedulerData { |
59 | | type DecodeOutput = TestSchedulerReturn; |
60 | | |
61 | 0 | fn decode(version: i64, _data: Bytes) -> Result<Self::DecodeOutput, Error> { |
62 | 0 | Ok(TestSchedulerReturn { version }) |
63 | 0 | } |
64 | | } |
65 | | |
66 | | impl SchedulerCurrentVersionProvider for TestSchedulerData { |
67 | 0 | fn current_version(&self) -> i64 { |
68 | 0 | self.version |
69 | 0 | } |
70 | | } |
71 | | |
72 | | const MAX_KEY: u16 = 1024; |
73 | | |
74 | 0 | fn random_key() -> StoreKey<'static> { |
75 | 0 | let key = rand::rng().random_range(0..MAX_KEY); |
76 | 0 | StoreKey::new_str(&key.to_string()).into_owned() |
77 | 0 | } |
78 | | |
79 | 0 | fn main() -> Result<(), Box<dyn core::error::Error>> { |
80 | 0 | let failed = Arc::new(RwLock::new(false)); |
81 | 0 | let redis_host = env::var("REDIS_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); |
82 | 0 | let max_client_permits = env::var("MAX_REDIS_PERMITS") |
83 | 0 | .unwrap_or_else(|_| "100".to_string()) |
84 | 0 | .parse()?; |
85 | 0 | let max_loops: usize = env::var("MAX_LOOPS") |
86 | 0 | .unwrap_or_else(|_| "2000000".to_string()) |
87 | 0 | .parse()?; |
88 | | |
89 | | #[expect( |
90 | | clippy::disallowed_methods, |
91 | | reason = "`We need `tokio::runtime::Runtime::block_on` so we can get errors _after_ threads finished" |
92 | | )] |
93 | 0 | tokio::runtime::Builder::new_multi_thread() |
94 | 0 | .enable_all() |
95 | 0 | .build() |
96 | 0 | .unwrap() |
97 | 0 | .block_on(async { |
98 | | // The OTLP exporters need to run in a Tokio context. |
99 | 0 | spawn!("init tracing", async { init_tracing() }) |
100 | 0 | .await? |
101 | 0 | .expect("Init tracing should work"); |
102 | | |
103 | 0 | let spec = RedisSpec { |
104 | 0 | addresses: vec![format!("redis://{redis_host}:6379/")], |
105 | 0 | connection_timeout_ms: 1000, |
106 | 0 | max_client_permits, |
107 | 0 | ..Default::default() |
108 | 0 | }; |
109 | 0 | let store = RedisStore::new(spec)?; |
110 | 0 | let mut count = 0; |
111 | 0 | let in_flight = Arc::new(AtomicUsize::new(0)); |
112 | | |
113 | | loop { |
114 | 0 | if count % 1000 == 0 { Branch (114:20): [Folded - Ignored]
|
115 | 0 | info!( |
116 | 0 | "Loop count {count}. In flight: {}", |
117 | 0 | in_flight.load(Ordering::Relaxed) |
118 | | ); |
119 | 0 | if *failed.read().unwrap() { Branch (119:24): [Folded - Ignored]
|
120 | 0 | return Err(Error::new( |
121 | 0 | Code::Internal, |
122 | 0 | "Failed in redis_store_tester".to_string(), |
123 | 0 | )); |
124 | 0 | } |
125 | 0 | } |
126 | 0 | if count == max_loops { Branch (126:20): [Folded - Ignored]
|
127 | 0 | return Ok(()); |
128 | 0 | } |
129 | 0 | count += 1; |
130 | 0 | in_flight.fetch_add(1, Ordering::Relaxed); |
131 | | |
132 | 0 | let store_clone = store.clone(); |
133 | 0 | let local_fail = failed.clone(); |
134 | 0 | let local_in_flight = in_flight.clone(); |
135 | | |
136 | 0 | background_spawn!("action", async move { |
137 | 0 | async fn run_action(store_clone: Arc<RedisStore>) -> Result<(), Error> { |
138 | 0 | let action_value = rand::rng().random_range(0..5); |
139 | 0 | match action_value { |
140 | | 0 => { |
141 | 0 | store_clone.has(random_key()).await?; |
142 | | } |
143 | | 1 => { |
144 | 0 | let (mut tx, rx) = make_buf_channel_pair(); |
145 | 0 | tx.send(Bytes::from_static(b"12345")).await?; |
146 | 0 | tx.send_eof()?; |
147 | 0 | store_clone |
148 | 0 | .update(random_key(), rx, UploadSizeInfo::ExactSize(5)) |
149 | 0 | .await?; |
150 | | } |
151 | | 2 => { |
152 | 0 | let mut results = (0..MAX_KEY).map(|_| None).collect::<Vec<_>>(); |
153 | | |
154 | 0 | store_clone |
155 | 0 | .has_with_results( |
156 | 0 | &(0..MAX_KEY) |
157 | 0 | .map(|i| StoreKey::Str(Cow::Owned(i.to_string()))) |
158 | 0 | .collect::<Vec<_>>(), |
159 | 0 | &mut results, |
160 | | ) |
161 | 0 | .await?; |
162 | | } |
163 | | 3 => { |
164 | 0 | store_clone |
165 | 0 | .update_oneshot(random_key(), Bytes::from_static(b"1234")) |
166 | 0 | .await?; |
167 | | } |
168 | | _ => { |
169 | 0 | let mut data = TestSchedulerData { |
170 | 0 | key: "test:scheduler_key_1".to_string(), |
171 | 0 | content: "Test scheduler data #1".to_string(), |
172 | 0 | version: 0, |
173 | 0 | }; |
174 | | |
175 | 0 | let res = store_clone.get_and_decode(data.clone()).await?; |
176 | 0 | if let Some(existing_data) = res { Branch (176:40): [Folded - Ignored]
|
177 | 0 | data.version = existing_data.version + 1; |
178 | 0 | } |
179 | | |
180 | 0 | store_clone.update_data(data).await?; |
181 | | } |
182 | | } |
183 | 0 | Ok(()) |
184 | 0 | } |
185 | 0 | match run_action(store_clone).await { |
186 | 0 | Ok(()) => {} |
187 | 0 | Err(e) => { |
188 | 0 | error!(?e, "Error!"); |
189 | 0 | *local_fail.write().unwrap() = true; |
190 | | } |
191 | | } |
192 | 0 | local_in_flight.fetch_sub(1, Ordering::Relaxed); |
193 | 0 | }); |
194 | | } |
195 | 0 | }) |
196 | 0 | .unwrap(); |
197 | 0 | if *failed.read().unwrap() { Branch (197:8): [Folded - Ignored]
|
198 | 0 | return Err(Error::new(Code::Internal, "Failed in redis_store_tester".to_string()).into()); |
199 | 0 | } |
200 | 0 | Ok(()) |
201 | 0 | } |