Coverage Report

Created: 2026-02-23 10:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-redis-tester/src/fake_redis.rs
Line
Count
Source
1
// Copyright 2026 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::Write;
16
use core::hash::BuildHasher;
17
use std::collections::HashMap;
18
19
use nativelink_util::background_spawn;
20
use redis::Value;
21
use redis_test::IntoRedisValue;
22
use tokio::io::{AsyncReadExt, AsyncWriteExt};
23
use tokio::net::TcpListener;
24
use tracing::{error, info, warn};
25
26
29
fn cmd_as_string(cmd: &redis::Cmd) -> String {
27
29
    let raw = cmd.get_packed_command();
28
29
    String::from_utf8(raw).unwrap()
29
29
}
30
31
592
pub(crate) fn arg_as_string(output: &mut String, arg: Value) {
32
592
    match arg {
33
3
        Value::SimpleString(s) => {
34
3
            write!(output, "+{s}\r\n").unwrap();
35
3
        }
36
41
        Value::Okay => {
37
41
            write!(output, "+OK\r\n").unwrap();
38
41
        }
39
171
        Value::BulkString(s) => {
40
171
            write!(
41
171
                output,
42
171
                "${}\r\n{}\r\n",
43
171
                s.len(),
44
171
                str::from_utf8(&s).unwrap()
45
171
            )
46
171
            .unwrap();
47
171
        }
48
155
        Value::Int(v) => {
49
155
            write!(output, ":{v}\r\n").unwrap();
50
155
        }
51
170
        Value::Array(values) => {
52
170
            write!(output, "*{}\r\n", values.len()).unwrap();
53
513
            for 
value343
in values {
54
343
                arg_as_string(output, value);
55
343
            }
56
        }
57
4
        Value::Map(values) => {
58
4
            write!(output, "%{}\r\n", values.len()).unwrap();
59
20
            for (
key16
,
value16
) in values {
60
16
                arg_as_string(output, key);
61
16
                arg_as_string(output, value);
62
16
            }
63
        }
64
48
        Value::Nil => {
65
48
            write!(output, "_\r\n").unwrap();
66
48
        }
67
        _ => {
68
0
            panic!("No support for {arg:?}")
69
        }
70
    }
71
592
}
72
73
29
fn args_as_string(args: Vec<Value>) -> String {
74
29
    let mut output = String::new();
75
80
    for 
arg51
in args {
76
51
        arg_as_string(&mut output, arg);
77
51
    }
78
29
    output
79
29
}
80
81
29
fn add_to_response<B: BuildHasher>(
82
29
    response: &mut HashMap<String, String, B>,
83
29
    cmd: &redis::Cmd,
84
29
    args: Vec<Value>,
85
29
) {
86
29
    response.insert(cmd_as_string(cmd), args_as_string(args));
87
29
}
88
89
10
fn setinfo(responses: &mut HashMap<String, String>) {
90
    // Library sends both lib-name and lib-ver in one go, so we respond to both
91
10
    add_to_response(
92
10
        responses,
93
10
        redis::cmd("CLIENT")
94
10
            .arg("SETINFO")
95
10
            .arg("LIB-NAME")
96
10
            .arg("redis-rs"),
97
10
        vec![Value::Okay, Value::Okay],
98
    );
99
10
}
100
101
6
pub fn add_lua_script<B: BuildHasher>(
102
6
    responses: &mut HashMap<String, String, B>,
103
6
    lua_script: &str,
104
6
    hash: &str,
105
6
) {
106
6
    add_to_response(
107
6
        responses,
108
6
        redis::cmd("SCRIPT").arg("LOAD").arg(lua_script),
109
6
        vec![hash.into_redis_value()],
110
    );
111
6
}
112
113
6
pub fn fake_redis_stream() -> HashMap<String, String> {
114
6
    let mut responses = HashMap::new();
115
6
    setinfo(&mut responses);
116
    // Does setinfo as well, so need to respond to all 3
117
6
    add_to_response(
118
6
        &mut responses,
119
6
        redis::cmd("SELECT").arg("3"),
120
6
        vec![Value::Okay, Value::Okay, Value::Okay],
121
    );
122
6
    responses
123
6
}
124
125
3
pub fn fake_redis_sentinel_master_stream() -> HashMap<String, String> {
126
3
    let mut response = fake_redis_stream();
127
3
    add_to_response(
128
3
        &mut response,
129
3
        &redis::cmd("ROLE"),
130
3
        vec![Value::Array(vec![
131
3
            "master".into_redis_value(),
132
3
            0.into_redis_value(),
133
3
            Value::Array(vec![]),
134
3
        ])],
135
    );
136
3
    response
137
3
}
138
139
4
pub fn fake_redis_sentinel_stream(master_name: &str, redis_port: u16) -> HashMap<String, String> {
140
4
    let mut response = HashMap::new();
141
4
    setinfo(&mut response);
142
143
    // Not a full "sentinel masters" response, but enough for redis-rs
144
4
    let resp: Vec<(Value, Value)> = vec![
145
4
        ("name".into_redis_value(), master_name.into_redis_value()),
146
4
        ("ip".into_redis_value(), "127.0.0.1".into_redis_value()),
147
4
        (
148
4
            "port".into_redis_value(),
149
4
            i64::from(redis_port).into_redis_value(),
150
4
        ),
151
4
        ("flags".into_redis_value(), "master".into_redis_value()),
152
    ];
153
154
4
    add_to_response(
155
4
        &mut response,
156
4
        redis::cmd("SENTINEL").arg("MASTERS"),
157
4
        vec![Value::Array(vec![Value::Map(resp)])],
158
    );
159
4
    response
160
4
}
161
162
14
pub(crate) async fn fake_redis_internal<H>(listener: TcpListener, handler: H)
163
14
where
164
14
    H: Fn(&[u8]) -> String + Send + Clone + 'static,
165
14
{
166
    loop {
167
39
        info!(
168
39
            "Waiting for connection on {}",
169
39
            listener.local_addr().unwrap()
170
        );
171
39
        let Ok((
mut stream25
, _)) = listener.accept().await else {
  Branch (171:13): [Folded - Ignored]
  Branch (171:13): [Folded - Ignored]
  Branch (171:13): [True: 6, False: 0]
  Branch (171:13): [True: 19, False: 0]
172
0
            error!("accept error");
173
0
            panic!("error");
174
        };
175
25
        info!("Accepted new connection");
176
25
        let local_handler = handler.clone();
177
25
        background_spawn!("thread", async move {
178
            loop {
179
8.78k
                let mut buf = vec![0; 8192];
180
8.78k
                let 
res8.76k
= stream.read(&mut buf).await.
unwrap8.76k
();
181
8.76k
                if res != 0 {
  Branch (181:20): [Folded - Ignored]
  Branch (181:20): [Folded - Ignored]
  Branch (181:20): [True: 154, False: 0]
  Branch (181:20): [True: 161, False: 8.44k]
182
315
                    let output = local_handler(&buf[..res]);
183
315
                    if !output.is_empty() {
  Branch (183:24): [Folded - Ignored]
  Branch (183:24): [Folded - Ignored]
  Branch (183:24): [True: 154, False: 0]
  Branch (183:24): [True: 32, False: 129]
184
186
                        stream.write_all(output.as_bytes()).await.unwrap();
185
129
                    }
186
8.44k
                }
187
            }
188
        });
189
    }
190
}
191
192
11
async fn fake_redis<B>(listener: TcpListener, responses: HashMap<String, String, B>)
193
11
where
194
11
    B: BuildHasher + Clone + Send + 'static,
195
11
{
196
11
    info!("Responses are: {:?}", responses);
197
11
    let values = responses.clone();
198
161
    let 
inner11
= move |buf: &[u8]| -> String {
199
161
        let str_buf = str::from_utf8(buf);
200
161
        if let Ok(
s32
) = str_buf {
  Branch (200:16): [Folded - Ignored]
  Branch (200:16): [Folded - Ignored]
  Branch (200:16): [True: 32, False: 129]
201
67
            for (key, value) in &values {
202
67
                if s.starts_with(key) {
  Branch (202:20): [Folded - Ignored]
  Branch (202:20): [Folded - Ignored]
  Branch (202:20): [True: 32, False: 35]
203
32
                    info!("Responding to {}", s.replace("\r\n", "\\r\\n"));
204
32
                    return value.clone();
205
35
                }
206
            }
207
0
            warn!("Unknown command: {s}");
208
        } else {
209
129
            warn!("Bytes buffer: {:?}", &buf);
210
        }
211
129
        String::new()
212
161
    };
213
11
    fake_redis_internal(listener, inner).await;
214
0
}
215
216
11
pub async fn make_fake_redis_with_responses<B: BuildHasher + Clone + Send + 'static>(
217
11
    responses: HashMap<String, String, B>,
218
11
) -> u16 {
219
11
    let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
220
11
    let port = listener.local_addr().unwrap().port();
221
11
    info!("Using port {port}");
222
223
11
    background_spawn!("listener", async move {
224
11
        fake_redis(listener, responses).await;
225
0
    });
226
227
11
    port
228
11
}