Coverage Report

Created: 2026-04-14 11:55

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-redis-tester/src/fake_redis.rs
Line
Count
Source
1
// Copyright 2026 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::Write;
16
use core::hash::BuildHasher;
17
use std::collections::HashMap;
18
19
use nativelink_util::background_spawn;
20
use redis::Value;
21
use redis_test::IntoRedisValue;
22
use tokio::io::{AsyncReadExt, AsyncWriteExt};
23
use tokio::net::TcpListener;
24
use tracing::{error, info, warn};
25
26
61
fn cmd_as_string(cmd: &redis::Cmd) -> String {
27
61
    let raw = cmd.get_packed_command();
28
61
    String::from_utf8(raw).unwrap()
29
61
}
30
31
833
pub(crate) fn arg_as_string(output: &mut String, arg: Value) {
32
833
    match arg {
33
86
        Value::SimpleString(s) => {
34
86
            write!(output, "+{s}\r\n").unwrap();
35
86
        }
36
88
        Value::Okay => {
37
88
            write!(output, "+OK\r\n").unwrap();
38
88
        }
39
197
        Value::BulkString(s) => {
40
197
            write!(
41
197
                output,
42
197
                "${}\r\n{}\r\n",
43
197
                s.len(),
44
197
                str::from_utf8(&s).unwrap()
45
197
            )
46
197
            .unwrap();
47
197
        }
48
184
        Value::Int(v) => {
49
184
            write!(output, ":{v}\r\n").unwrap();
50
184
        }
51
184
        Value::Array(values) => {
52
184
            write!(output, "*{}\r\n", values.len()).unwrap();
53
363
            for value in 
values184
{
54
363
                arg_as_string(output, value);
55
363
            }
56
        }
57
46
        Value::Map(values) => {
58
46
            write!(output, "%{}\r\n", values.len()).unwrap();
59
67
            for (key, value) in 
values46
{
60
67
                arg_as_string(output, key);
61
67
                arg_as_string(output, value);
62
67
            }
63
        }
64
48
        Value::Nil => {
65
48
            write!(output, "_\r\n").unwrap();
66
48
        }
67
        _ => {
68
0
            panic!("No support for {arg:?}")
69
        }
70
    }
71
833
}
72
73
48
fn args_as_string(args: Vec<Value>) -> String {
74
48
    let mut output = String::new();
75
125
    for arg in 
args48
{
76
125
        arg_as_string(&mut output, arg);
77
125
    }
78
48
    output
79
48
}
80
81
22
pub fn add_to_response<B: BuildHasher>(
82
22
    response: &mut HashMap<String, String, B>,
83
22
    cmd: &redis::Cmd,
84
22
    args: Vec<Value>,
85
22
) {
86
22
    add_to_response_raw(response, cmd, args_as_string(args));
87
22
}
88
89
22
pub fn add_to_response_raw<B: BuildHasher>(
90
22
    response: &mut HashMap<String, String, B>,
91
22
    cmd: &redis::Cmd,
92
22
    args: String,
93
22
) {
94
22
    response.insert(cmd_as_string(cmd), args);
95
22
}
96
97
13
fn setinfo(responses: &mut HashMap<String, String>) {
98
    // We do raw inserts of command here, because the library sends 3/4 commands in one go
99
    // They always start with HELLO, then optionally SELECT, so we use this to differentiate
100
13
    let hello = cmd_as_string(redis::cmd("HELLO").arg("3"));
101
13
    let setinfo = cmd_as_string(
102
13
        redis::cmd("CLIENT")
103
13
            .arg("SETINFO")
104
13
            .arg("LIB-NAME")
105
13
            .arg("redis-rs"),
106
    );
107
13
    responses.insert(
108
13
        [hello.clone(), setinfo.clone()].join(""),
109
13
        args_as_string(vec![
110
13
            Value::Map(vec![(
111
13
                Value::SimpleString("server".into()),
112
13
                Value::SimpleString("redis".into()),
113
13
            )]),
114
13
            Value::Okay,
115
13
            Value::Okay,
116
        ]),
117
    );
118
13
    responses.insert(
119
13
        [hello, cmd_as_string(redis::cmd("SELECT").arg(3)), setinfo].join(""),
120
13
        args_as_string(vec![
121
13
            Value::Map(vec![(
122
13
                Value::SimpleString("server".into()),
123
13
                Value::SimpleString("redis".into()),
124
13
            )]),
125
13
            Value::Okay,
126
13
            Value::Okay,
127
13
            Value::Okay,
128
        ]),
129
    );
130
13
}
131
132
6
pub fn add_lua_script<B: BuildHasher>(
133
6
    responses: &mut HashMap<String, String, B>,
134
6
    lua_script: &str,
135
6
    hash: &str,
136
6
) {
137
6
    add_to_response(
138
6
        responses,
139
6
        redis::cmd("SCRIPT").arg("LOAD").arg(lua_script),
140
6
        vec![hash.into_redis_value()],
141
    );
142
6
}
143
144
6
pub fn fake_redis_stream() -> HashMap<String, String> {
145
6
    let mut responses = HashMap::new();
146
6
    setinfo(&mut responses);
147
    // Does setinfo as well, so need to respond to all 3
148
6
    add_to_response(
149
6
        &mut responses,
150
6
        redis::cmd("SELECT").arg("3"),
151
6
        vec![Value::Okay, Value::Okay, Value::Okay],
152
    );
153
6
    responses
154
6
}
155
156
3
pub fn fake_redis_sentinel_master_stream() -> HashMap<String, String> {
157
3
    let mut response = fake_redis_stream();
158
3
    add_to_response(
159
3
        &mut response,
160
3
        &redis::cmd("ROLE"),
161
3
        vec![Value::Array(vec![
162
3
            "master".into_redis_value(),
163
3
            0.into_redis_value(),
164
3
            Value::Array(vec![]),
165
3
        ])],
166
    );
167
3
    response
168
3
}
169
170
7
pub fn fake_redis_sentinel_stream(master_name: &str, redis_port: u16) -> HashMap<String, String> {
171
7
    let mut response = HashMap::new();
172
7
    setinfo(&mut response);
173
174
    // Not a full "sentinel masters" response, but enough for redis-rs
175
7
    let resp: Vec<(Value, Value)> = vec![
176
7
        ("name".into_redis_value(), master_name.into_redis_value()),
177
7
        ("ip".into_redis_value(), "127.0.0.1".into_redis_value()),
178
7
        (
179
7
            "port".into_redis_value(),
180
7
            i64::from(redis_port).into_redis_value(),
181
7
        ),
182
7
        ("flags".into_redis_value(), "master".into_redis_value()),
183
    ];
184
185
7
    add_to_response(
186
7
        &mut response,
187
7
        redis::cmd("SENTINEL").arg("MASTERS"),
188
7
        vec![Value::Array(vec![Value::Map(resp)])],
189
    );
190
7
    response
191
7
}
192
193
20
pub(crate) async fn fake_redis_internal<H>(listener: TcpListener, handlers: Vec<H>)
194
20
where
195
20
    H: Fn(&[u8]) -> String + Send + Clone + 'static + Sync,
196
20
{
197
20
    let mut handler_iter = handlers.iter().cloned().cycle();
198
    loop {
199
53
        info!(
200
            "Waiting for connection on {}",
201
53
            listener.local_addr().unwrap()
202
        );
203
53
        let Ok((
mut stream33
, _)) = listener.accept().await else {
204
0
            error!("accept error");
205
0
            panic!("error");
206
        };
207
33
        info!("Accepted new connection");
208
33
        let local_handler = handler_iter.next().unwrap();
209
33
        background_spawn!("thread", async move {
210
            loop {
211
102k
                let mut buf = vec![0; 8192];
212
102k
                let 
res102k
= stream.read(&mut buf).await.
unwrap102k
();
213
102k
                if res != 0 {
214
350
                    let output = local_handler(&buf[..res]);
215
350
                    if !output.is_empty() {
216
217
                        stream.write_all(output.as_bytes()).await.unwrap();
217
133
                    }
218
101k
                }
219
            }
220
        });
221
    }
222
}
223
224
14
async fn fake_redis<B>(listener: TcpListener, all_responses: Vec<HashMap<String, String, B>>)
225
14
where
226
14
    B: BuildHasher + Clone + Send + 'static + Sync,
227
14
{
228
14
    let funcs = all_responses
229
14
        .iter()
230
14
        .map(|responses| {
231
14
            info!("Responses are: {:?}", responses);
232
14
            let values = responses.clone();
233
169
            move |buf: &[u8]| -> String {
234
169
                let str_buf = String::from_utf8_lossy(buf).into_owned();
235
615
                for (key, value) in 
&values169
{
236
615
                    if str_buf.starts_with(key) {
237
36
                        info!("Responding to {}", str_buf.replace("\r\n", "\\r\\n"));
238
36
                        return value.clone();
239
579
                    }
240
                }
241
133
                warn!(
242
                    "Unknown command: {}",
243
133
                    str_buf.chars().take(1000).collect::<String>()
244
                );
245
133
                String::new()
246
169
            }
247
14
        })
248
14
        .collect();
249
14
    fake_redis_internal(listener, funcs).await;
250
0
}
251
252
14
pub async fn make_fake_redis_with_multiple_responses<
253
14
    B: BuildHasher + Clone + Send + 'static + Sync,
254
14
>(
255
14
    responses: Vec<HashMap<String, String, B>>,
256
14
) -> u16 {
257
14
    let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
258
14
    let port = listener.local_addr().unwrap().port();
259
14
    info!("Using port {port}");
260
261
14
    background_spawn!("listener", async move {
262
14
        fake_redis(listener, responses).await;
263
0
    });
264
265
14
    port
266
14
}
267
268
14
pub async fn make_fake_redis_with_responses<B: BuildHasher + Clone + Send + 'static + Sync>(
269
14
    responses: HashMap<String, String, B>,
270
14
) -> u16 {
271
14
    make_fake_redis_with_multiple_responses(vec![responses]).await
272
14
}