Coverage Report

Created: 2025-12-17 22:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/src/bin/redis_store_tester.rs
Line
Count
Source
1
use core::sync::atomic::{AtomicUsize, Ordering};
2
use std::borrow::Cow;
3
use std::env;
4
use std::sync::{Arc, RwLock};
5
6
use bytes::Bytes;
7
use nativelink_config::stores::RedisSpec;
8
use nativelink_error::{Code, Error};
9
use nativelink_store::redis_store::RedisStore;
10
use nativelink_util::buf_channel::make_buf_channel_pair;
11
use nativelink_util::store_trait::{
12
    SchedulerCurrentVersionProvider, SchedulerStore, SchedulerStoreDataProvider,
13
    SchedulerStoreDecodeTo, SchedulerStoreKeyProvider, StoreKey, StoreLike, TrueValue,
14
    UploadSizeInfo,
15
};
16
use nativelink_util::telemetry::init_tracing;
17
use nativelink_util::{background_spawn, spawn};
18
use rand::Rng;
19
use tracing::{error, info};
20
21
// Define test structures that implement the scheduler traits
22
#[derive(Debug, Clone, PartialEq)]
23
struct TestSchedulerData {
24
    key: String,
25
    content: String,
26
    version: i64,
27
}
28
29
struct TestSchedulerReturn {
30
    version: i64,
31
}
32
33
impl SchedulerStoreKeyProvider for TestSchedulerData {
34
    type Versioned = TrueValue; // Using versioned storage
35
36
0
    fn get_key(&self) -> StoreKey<'static> {
37
0
        StoreKey::Str(Cow::Owned(self.key.clone()))
38
0
    }
39
}
40
41
impl SchedulerStoreDataProvider for TestSchedulerData {
42
0
    fn try_into_bytes(self) -> Result<Bytes, Error> {
43
0
        Ok(Bytes::from(self.content.into_bytes()))
44
0
    }
45
46
0
    fn get_indexes(&self) -> Result<Vec<(&'static str, Bytes)>, Error> {
47
        // Add some test indexes - need to use 'static strings
48
0
        Ok(vec![
49
0
            ("test_index", Bytes::from("test_value")),
50
0
            (
51
0
                "content_prefix",
52
0
                Bytes::from(self.content.chars().take(10).collect::<String>()),
53
0
            ),
54
0
        ])
55
0
    }
56
}
57
58
impl SchedulerStoreDecodeTo for TestSchedulerData {
59
    type DecodeOutput = TestSchedulerReturn;
60
61
0
    fn decode(version: i64, _data: Bytes) -> Result<Self::DecodeOutput, Error> {
62
0
        Ok(TestSchedulerReturn { version })
63
0
    }
64
}
65
66
impl SchedulerCurrentVersionProvider for TestSchedulerData {
67
0
    fn current_version(&self) -> i64 {
68
0
        self.version
69
0
    }
70
}
71
72
const MAX_KEY: u16 = 1024;
73
74
0
fn random_key() -> StoreKey<'static> {
75
0
    let key = rand::rng().random_range(0..MAX_KEY);
76
0
    StoreKey::new_str(&key.to_string()).into_owned()
77
0
}
78
79
0
fn main() -> Result<(), Box<dyn core::error::Error>> {
80
0
    let failed = Arc::new(RwLock::new(false));
81
0
    let redis_host = env::var("REDIS_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
82
0
    let max_client_permits = env::var("MAX_REDIS_PERMITS")
83
0
        .unwrap_or_else(|_| "100".to_string())
84
0
        .parse()?;
85
0
    let max_loops: usize = env::var("MAX_LOOPS")
86
0
        .unwrap_or_else(|_| "2000000".to_string())
87
0
        .parse()?;
88
89
    #[expect(
90
        clippy::disallowed_methods,
91
        reason = "`We need `tokio::runtime::Runtime::block_on` so we can get errors _after_ threads finished"
92
    )]
93
0
    tokio::runtime::Builder::new_multi_thread()
94
0
        .enable_all()
95
0
        .build()
96
0
        .unwrap()
97
0
        .block_on(async {
98
            // The OTLP exporters need to run in a Tokio context.
99
0
            spawn!("init tracing", async { init_tracing() })
100
0
                .await?
101
0
                .expect("Init tracing should work");
102
103
0
            let spec = RedisSpec {
104
0
                addresses: vec![format!("redis://{redis_host}:6379/")],
105
0
                connection_timeout_ms: 1000,
106
0
                max_client_permits,
107
0
                ..Default::default()
108
0
            };
109
0
            let store = RedisStore::new(spec)?;
110
0
            let mut count = 0;
111
0
            let in_flight = Arc::new(AtomicUsize::new(0));
112
113
            loop {
114
0
                if count % 1000 == 0 {
  Branch (114:20): [Folded - Ignored]
115
0
                    info!(
116
0
                        "Loop count {count}. In flight: {}",
117
0
                        in_flight.load(Ordering::Relaxed)
118
                    );
119
0
                    if *failed.read().unwrap() {
  Branch (119:24): [Folded - Ignored]
120
0
                        return Err(Error::new(
121
0
                            Code::Internal,
122
0
                            "Failed in redis_store_tester".to_string(),
123
0
                        ));
124
0
                    }
125
0
                }
126
0
                if count == max_loops {
  Branch (126:20): [Folded - Ignored]
127
0
                    return Ok(());
128
0
                }
129
0
                count += 1;
130
0
                in_flight.fetch_add(1, Ordering::Relaxed);
131
132
0
                let store_clone = store.clone();
133
0
                let local_fail = failed.clone();
134
0
                let local_in_flight = in_flight.clone();
135
136
0
                background_spawn!("action", async move {
137
0
                    async fn run_action(store_clone: Arc<RedisStore>) -> Result<(), Error> {
138
0
                        let action_value = rand::rng().random_range(0..5);
139
0
                        match action_value {
140
                            0 => {
141
0
                                store_clone.has(random_key()).await?;
142
                            }
143
                            1 => {
144
0
                                let (mut tx, rx) = make_buf_channel_pair();
145
0
                                tx.send(Bytes::from_static(b"12345")).await?;
146
0
                                tx.send_eof()?;
147
0
                                store_clone
148
0
                                    .update(random_key(), rx, UploadSizeInfo::ExactSize(5))
149
0
                                    .await?;
150
                            }
151
                            2 => {
152
0
                                let mut results = (0..MAX_KEY).map(|_| None).collect::<Vec<_>>();
153
154
0
                                store_clone
155
0
                                    .has_with_results(
156
0
                                        &(0..MAX_KEY)
157
0
                                            .map(|i| StoreKey::Str(Cow::Owned(i.to_string())))
158
0
                                            .collect::<Vec<_>>(),
159
0
                                        &mut results,
160
                                    )
161
0
                                    .await?;
162
                            }
163
                            3 => {
164
0
                                store_clone
165
0
                                    .update_oneshot(random_key(), Bytes::from_static(b"1234"))
166
0
                                    .await?;
167
                            }
168
                            _ => {
169
0
                                let mut data = TestSchedulerData {
170
0
                                    key: "test:scheduler_key_1".to_string(),
171
0
                                    content: "Test scheduler data #1".to_string(),
172
0
                                    version: 0,
173
0
                                };
174
175
0
                                let res = store_clone.get_and_decode(data.clone()).await?;
176
0
                                if let Some(existing_data) = res {
  Branch (176:40): [Folded - Ignored]
177
0
                                    data.version = existing_data.version + 1;
178
0
                                }
179
180
0
                                store_clone.update_data(data).await?;
181
                            }
182
                        }
183
0
                        Ok(())
184
0
                    }
185
0
                    match run_action(store_clone).await {
186
0
                        Ok(()) => {}
187
0
                        Err(e) => {
188
0
                            error!(?e, "Error!");
189
0
                            *local_fail.write().unwrap() = true;
190
                        }
191
                    }
192
0
                    local_in_flight.fetch_sub(1, Ordering::Relaxed);
193
0
                });
194
            }
195
0
        })
196
0
        .unwrap();
197
0
    if *failed.read().unwrap() {
  Branch (197:8): [Folded - Ignored]
198
0
        return Err(Error::new(Code::Internal, "Failed in redis_store_tester".to_string()).into());
199
0
    }
200
0
    Ok(())
201
0
}