Coverage Report

Created: 2026-02-23 10:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/src/bin/redis_store_tester.rs
Line
Count
Source
1
use core::sync::atomic::{AtomicUsize, Ordering};
2
use core::time::Duration;
3
use std::borrow::Cow;
4
use std::env;
5
use std::sync::{Arc, RwLock};
6
7
use bytes::Bytes;
8
use clap::{Parser, ValueEnum};
9
use futures::TryStreamExt;
10
use nativelink_config::stores::{RedisMode, RedisSpec};
11
use nativelink_error::{Code, Error, ResultExt};
12
use nativelink_store::redis_store::RedisStore;
13
use nativelink_util::buf_channel::make_buf_channel_pair;
14
use nativelink_util::store_trait::{
15
    SchedulerCurrentVersionProvider, SchedulerIndexProvider, SchedulerStore,
16
    SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider, StoreDriver,
17
    StoreKey, StoreLike, TrueValue, UploadSizeInfo,
18
};
19
use nativelink_util::telemetry::init_tracing;
20
use nativelink_util::{background_spawn, spawn};
21
use rand::Rng;
22
use tokio::time::sleep;
23
use tracing::{error, info};
24
25
// Define test structures that implement the scheduler traits
26
#[derive(Debug, Clone, PartialEq)]
27
struct TestSchedulerData {
28
    key: String,
29
    content: String,
30
    version: i64,
31
}
32
33
#[derive(Debug)]
34
struct TestSchedulerReturn {
35
    version: i64,
36
}
37
38
impl SchedulerStoreKeyProvider for TestSchedulerData {
39
    type Versioned = TrueValue; // Using versioned storage
40
41
0
    fn get_key(&self) -> StoreKey<'static> {
42
0
        StoreKey::Str(Cow::Owned(self.key.clone()))
43
0
    }
44
}
45
46
impl SchedulerStoreDataProvider for TestSchedulerData {
47
0
    fn try_into_bytes(self) -> Result<Bytes, Error> {
48
0
        Ok(Bytes::from(self.content.into_bytes()))
49
0
    }
50
51
0
    fn get_indexes(&self) -> Result<Vec<(&'static str, Bytes)>, Error> {
52
        // Add some test indexes - need to use 'static strings
53
0
        Ok(vec![
54
0
            ("test_index", Bytes::from("test_value")),
55
0
            (
56
0
                "content_prefix",
57
0
                Bytes::from(self.content.chars().take(10).collect::<String>()),
58
0
            ),
59
0
        ])
60
0
    }
61
}
62
63
impl SchedulerStoreDecodeTo for TestSchedulerData {
64
    type DecodeOutput = TestSchedulerReturn;
65
66
0
    fn decode(version: i64, _data: Bytes) -> Result<Self::DecodeOutput, Error> {
67
0
        Ok(TestSchedulerReturn { version })
68
0
    }
69
}
70
71
impl SchedulerCurrentVersionProvider for TestSchedulerData {
72
0
    fn current_version(&self) -> i64 {
73
0
        self.version
74
0
    }
75
}
76
77
struct SearchByContentPrefix {
78
    prefix: String,
79
}
80
81
impl SchedulerIndexProvider for SearchByContentPrefix {
82
    const KEY_PREFIX: &'static str = "test:";
83
    const INDEX_NAME: &'static str = "content_prefix";
84
    type Versioned = TrueValue;
85
86
0
    fn index_value(&self) -> Cow<'_, str> {
87
0
        Cow::Borrowed(&self.prefix)
88
0
    }
89
}
90
91
impl SchedulerStoreKeyProvider for SearchByContentPrefix {
92
    type Versioned = TrueValue;
93
94
0
    fn get_key(&self) -> StoreKey<'static> {
95
0
        StoreKey::Str(Cow::Owned("dummy_key".to_string()))
96
0
    }
97
}
98
99
impl SchedulerStoreDecodeTo for SearchByContentPrefix {
100
    type DecodeOutput = TestSchedulerReturn;
101
102
0
    fn decode(version: i64, data: Bytes) -> Result<Self::DecodeOutput, Error> {
103
0
        TestSchedulerData::decode(version, data)
104
0
    }
105
}
106
107
const MAX_KEY: u16 = 1024;
108
109
/// Wrapper type for CLI parsing since we can't implement foreign traits on foreign types.
110
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, ValueEnum)]
111
enum RedisModeArg {
112
    Cluster,
113
    Sentinel,
114
    #[default]
115
    Standard,
116
}
117
118
impl From<RedisModeArg> for RedisMode {
119
0
    fn from(arg: RedisModeArg) -> Self {
120
0
        match arg {
121
0
            RedisModeArg::Standard => Self::Standard,
122
0
            RedisModeArg::Sentinel => Self::Sentinel,
123
0
            RedisModeArg::Cluster => Self::Cluster,
124
        }
125
0
    }
126
}
127
128
0
fn random_key() -> StoreKey<'static> {
129
0
    let key = rand::rng().random_range(0..MAX_KEY);
130
0
    StoreKey::new_str(&key.to_string()).into_owned()
131
0
}
132
133
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, ValueEnum)]
134
enum TestMode {
135
    #[default]
136
    Random,
137
    Sequential,
138
}
139
140
#[derive(Parser, Debug)]
141
#[command(version, about)]
142
struct Args {
143
    #[arg(value_enum, short, long, default_value_t)]
144
    redis_mode: RedisModeArg,
145
146
    #[arg(value_enum, short, long, default_value_t)]
147
    mode: TestMode,
148
}
149
150
0
async fn run<S: StoreDriver + SchedulerStore>(
151
0
    store: Arc<S>,
152
0
    max_loops: usize,
153
0
    failed: Arc<RwLock<bool>>,
154
0
    mode: TestMode,
155
0
) -> Result<(), Error> {
156
0
    let mut count = 0;
157
0
    let in_flight = Arc::new(AtomicUsize::new(0));
158
159
    loop {
160
0
        if count % 1000 == 0 {
  Branch (160:12): [Folded - Ignored]
161
0
            info!(
162
0
                "Loop count {count}. In flight: {}",
163
0
                in_flight.load(Ordering::Relaxed)
164
            );
165
0
            if *failed.read().unwrap() {
  Branch (165:16): [Folded - Ignored]
166
0
                return Err(Error::new(
167
0
                    Code::Internal,
168
0
                    "Failed in redis_store_tester".to_string(),
169
0
                ));
170
0
            }
171
0
        }
172
0
        if count == max_loops {
  Branch (172:12): [Folded - Ignored]
173
            loop {
174
0
                let remaining = in_flight.load(Ordering::Relaxed);
175
0
                if remaining == 0 {
  Branch (175:20): [Folded - Ignored]
176
0
                    return Ok(());
177
0
                }
178
0
                info!(remaining, "Remaining");
179
0
                sleep(Duration::from_secs(1)).await;
180
            }
181
0
        }
182
0
        count += 1;
183
0
        in_flight.fetch_add(1, Ordering::Relaxed);
184
185
0
        let store_clone = store.clone();
186
0
        let local_fail = failed.clone();
187
0
        let local_in_flight = in_flight.clone();
188
189
0
        let max_action_value = 7;
190
0
        let action_value = match mode {
191
0
            TestMode::Random => rand::rng().random_range(0..max_action_value),
192
0
            TestMode::Sequential => count % max_action_value,
193
        };
194
195
0
        background_spawn!("action", async move {
196
0
            async fn run_action<S: StoreDriver + SchedulerStore>(
197
0
                action_value: usize,
198
0
                store_clone: Arc<S>,
199
0
            ) -> Result<(), Error> {
200
0
                match action_value {
201
                    0 => {
202
0
                        store_clone.has(random_key()).await?;
203
                    }
204
                    1 => {
205
0
                        let (mut tx, rx) = make_buf_channel_pair();
206
0
                        tx.send(Bytes::from_static(b"12345")).await?;
207
0
                        tx.send_eof()?;
208
0
                        store_clone
209
0
                            .update(random_key(), rx, UploadSizeInfo::ExactSize(5))
210
0
                            .await?;
211
                    }
212
                    2 => {
213
0
                        let mut results = (0..MAX_KEY).map(|_| None).collect::<Vec<_>>();
214
215
0
                        store_clone
216
0
                            .has_with_results(
217
0
                                &(0..MAX_KEY)
218
0
                                    .map(|i| StoreKey::Str(Cow::Owned(i.to_string())))
219
0
                                    .collect::<Vec<_>>(),
220
0
                                &mut results,
221
                            )
222
0
                            .await?;
223
                    }
224
                    3 => {
225
0
                        store_clone
226
0
                            .update_oneshot(random_key(), Bytes::from_static(b"1234"))
227
0
                            .await?;
228
                    }
229
                    4 => {
230
0
                        let res = store_clone
231
0
                            .list(.., |_key| true)
232
0
                            .await
233
0
                            .err_tip(|| "In list")?;
234
0
                        info!(%res, "end list");
235
                    }
236
                    5 => {
237
0
                        let search_provider = SearchByContentPrefix {
238
0
                            prefix: "Searchable".to_string(),
239
0
                        };
240
0
                        for i in 0..5 {
241
0
                            let data = TestSchedulerData {
242
0
                                key: format!("test:search_key_{i}"),
243
0
                                content: format!("Searchable content #{i}"),
244
0
                                version: 0,
245
0
                            };
246
247
0
                            store_clone.update_data(data).await?;
248
                        }
249
0
                        let search_results: Vec<_> = store_clone
250
0
                            .search_by_index_prefix(search_provider)
251
0
                            .await?
252
0
                            .try_collect()
253
0
                            .await?;
254
0
                        info!(?search_results, "search results");
255
                    }
256
                    _ => {
257
0
                        let mut data = TestSchedulerData {
258
0
                            key: "test:scheduler_key_1".to_string(),
259
0
                            content: "Test scheduler data #1".to_string(),
260
0
                            version: 0,
261
0
                        };
262
263
0
                        let res = store_clone.get_and_decode(data.clone()).await?;
264
0
                        if let Some(existing_data) = res {
  Branch (264:32): [Folded - Ignored]
265
0
                            data.version = existing_data.version + 1;
266
0
                        }
267
268
0
                        store_clone.update_data(data).await?;
269
                    }
270
                }
271
0
                Ok(())
272
0
            }
273
0
            match run_action(action_value, store_clone).await {
274
0
                Ok(()) => {}
275
0
                Err(e) => {
276
0
                    error!(?e, "Error!");
277
0
                    *local_fail.write().unwrap() = true;
278
                }
279
            }
280
0
            local_in_flight.fetch_sub(1, Ordering::Relaxed);
281
0
        });
282
    }
283
0
}
284
285
0
fn main() -> Result<(), Box<dyn core::error::Error>> {
286
0
    let args = Args::parse();
287
0
    let redis_mode: RedisMode = args.redis_mode.into();
288
289
0
    let failed = Arc::new(RwLock::new(false));
290
0
    let redis_host = env::var("REDIS_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
291
0
    let max_client_permits = env::var("MAX_REDIS_PERMITS")
292
0
        .unwrap_or_else(|_| "100".to_string())
293
0
        .parse()?;
294
0
    let max_loops: usize = env::var("MAX_LOOPS")
295
0
        .unwrap_or_else(|_| "2000000".to_string())
296
0
        .parse()?;
297
298
    #[expect(
299
        clippy::disallowed_methods,
300
        reason = "`We need `tokio::runtime::Runtime::block_on` so we can get errors _after_ threads finished"
301
    )]
302
0
    tokio::runtime::Builder::new_multi_thread()
303
0
        .enable_all()
304
0
        .build()
305
0
        .unwrap()
306
0
        .block_on(async {
307
            // The OTLP exporters need to run in a Tokio context.
308
0
            spawn!("init tracing", async { init_tracing() })
309
0
                .await?
310
0
                .expect("Init tracing should work");
311
312
0
            let redis_port = match redis_mode {
313
0
                RedisMode::Standard => 6379,
314
0
                RedisMode::Sentinel => 26379,
315
0
                RedisMode::Cluster => 7000,
316
            };
317
0
            let addr = match redis_mode {
318
0
                RedisMode::Sentinel => format!("redis+sentinel://{redis_host}:{redis_port}/"),
319
0
                _ => format!("redis://{redis_host}:{redis_port}/"),
320
            };
321
0
            let spec = RedisSpec {
322
0
                addresses: vec![addr],
323
0
                connection_timeout_ms: 1000,
324
0
                max_client_permits,
325
0
                mode: redis_mode,
326
0
                ..Default::default()
327
0
            };
328
0
            match spec.mode {
329
                RedisMode::Standard | RedisMode::Sentinel => {
330
0
                    let store = RedisStore::new_standard(spec).await?;
331
0
                    run(store, max_loops, failed.clone(), args.mode).await
332
                }
333
                RedisMode::Cluster => {
334
0
                    let store = RedisStore::new_cluster(spec).await?;
335
0
                    run(store, max_loops, failed.clone(), args.mode).await
336
                }
337
            }
338
0
        })
339
0
        .unwrap();
340
0
    if *failed.read().unwrap() {
  Branch (340:8): [Folded - Ignored]
341
0
        return Err(Error::new(Code::Internal, "Failed in redis_store_tester".to_string()).into());
342
0
    }
343
0
    Ok(())
344
0
}