Coverage Report

Created: 2026-05-23 21:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-redis-tester/src/read_only_redis.rs
Line
Count
Source
1
// Copyright 2026 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::Write;
16
use core::sync::atomic::{AtomicBool, Ordering};
17
use std::sync::Arc;
18
19
use either::Either;
20
use nativelink_util::background_spawn;
21
use redis::Value;
22
use redis_protocol::resp2::decode::decode;
23
use redis_protocol::resp2::types::OwnedFrame;
24
use tokio::net::TcpListener;
25
use tracing::info;
26
27
use crate::fake_redis::{arg_as_string, fake_redis_internal};
28
29
const FAKE_SCRIPT_SHA: &str = "5148c724ce419ea27d1971dcb61c111dbbc6b63e";
30
31
#[derive(Clone, Debug)]
32
pub struct ReadOnlyRedis {
33
    // The first time we hit SETRANGE/HMSET, we output a ReadOnly. Next time, we assume we're reconnected and do correct values
34
    readonly_triggered: Arc<AtomicBool>,
35
}
36
37
impl Default for ReadOnlyRedis {
38
0
    fn default() -> Self {
39
0
        Self::new()
40
0
    }
41
}
42
43
impl ReadOnlyRedis {
44
3
    pub fn new() -> Self {
45
3
        Self {
46
3
            readonly_triggered: Arc::new(AtomicBool::new(false)),
47
3
        }
48
3
    }
49
50
3
    async fn dynamic_fake_redis(self, listener: TcpListener) {
51
3
        let readonly_err_str = "READONLY You can't write against a read only replica.";
52
3
        let readonly_err = format!("!{}\r\n{readonly_err_str}\r\n", readonly_err_str.len());
53
54
28
        let 
inner3
= move |buf: &[u8]| -> String {
55
28
            let mut output = String::new();
56
28
            let mut buf_index = 0;
57
            loop {
58
48
                let frame = match decode(&buf[buf_index..]).unwrap() {
59
48
                    Some((frame, amt)) => {
60
48
                        buf_index += amt;
61
48
                        frame
62
                    }
63
                    None => {
64
0
                        panic!("No frame!");
65
                    }
66
                };
67
48
                let (cmd, args) = {
68
48
                    if let OwnedFrame::Array(a) = frame {
69
48
                        if let OwnedFrame::BulkString(s) = a.first().unwrap() {
70
48
                            let args: Vec<_> = a[1..].to_vec();
71
48
                            (str::from_utf8(s).unwrap().to_string(), args)
72
                        } else {
73
0
                            panic!("Array not starting with cmd: {a:?}");
74
                        }
75
                    } else {
76
0
                        panic!("Non array cmd: {frame:?}");
77
                    }
78
                };
79
80
48
                let ret: Either<Value, String> = match cmd.as_str() {
81
48
                    "HELLO" => 
Either::Left(Value::Map(10
vec!10
[(
82
10
                        Value::SimpleString("server".into()),
83
10
                        Value::SimpleString("redis".into()),
84
10
                    )])),
85
38
                    "CLIENT" => {
86
                        // We can safely ignore these, as it's just setting the library name/version
87
20
                        Either::Left(Value::Int(0))
88
                    }
89
18
                    "SCRIPT" => {
90
5
                        assert_eq!(args[0], OwnedFrame::BulkString(b"LOAD".to_vec()));
91
92
5
                        let OwnedFrame::BulkString(ref _script) = args[1] else {
93
0
                            panic!("Script should be a bulkstring: {args:?}");
94
                        };
95
5
                        Either::Left(Value::SimpleString(FAKE_SCRIPT_SHA.to_string()))
96
                    }
97
13
                    "ROLE" => 
Either::Left(Value::Array(5
vec!5
[
98
5
                        Value::BulkString(b"master".to_vec()),
99
5
                        Value::Int(0),
100
5
                        Value::Array(vec![]),
101
5
                    ])),
102
8
                    "SETRANGE" => {
103
2
                        let value = self.readonly_triggered.load(Ordering::Relaxed);
104
2
                        if value {
105
1
                            Either::Left(Value::Int(5))
106
                        } else {
107
1
                            self.readonly_triggered.store(true, Ordering::Relaxed);
108
1
                            Either::Right(readonly_err.clone())
109
                        }
110
                    }
111
6
                    "STRLEN" => 
Either::Left(Value::Int(5))1
,
112
5
                    "RENAME" | 
"HMSET"4
=> {
113
3
                        let value = self.readonly_triggered.load(Ordering::Relaxed);
114
3
                        if value {
115
2
                            Either::Left(Value::Okay)
116
                        } else {
117
1
                            self.readonly_triggered.store(true, Ordering::Relaxed);
118
1
                            Either::Right(readonly_err.clone())
119
                        }
120
                    }
121
2
                    "EVALSHA" => 
Either::Left(Value::Array(1
vec!1
[Value::Int(1), Value::Int(0)])),
122
1
                    "EXPIRE" => {
123
1
                        assert_eq!(args[1], OwnedFrame::BulkString(b"60".to_vec()));
124
1
                        let value = self.readonly_triggered.load(Ordering::Relaxed);
125
1
                        if value {
126
1
                            Either::Left(Value::Int(1))
127
                        } else {
128
0
                            self.readonly_triggered.store(true, Ordering::Relaxed);
129
0
                            Either::Right(readonly_err.clone())
130
                        }
131
                    }
132
0
                    actual => {
133
0
                        panic!("Mock command not implemented! {actual:?}");
134
                    }
135
                };
136
137
48
                match ret {
138
46
                    Either::Left(v) => {
139
46
                        arg_as_string(&mut output, v);
140
46
                    }
141
2
                    Either::Right(s) => {
142
2
                        write!(&mut output, "{s}").unwrap();
143
2
                    }
144
                }
145
146
48
                if buf_index == buf.len() {
147
28
                    break;
148
20
                }
149
            }
150
28
            output
151
28
        };
152
3
        fake_redis_internal(listener, vec![inner]).await;
153
0
    }
154
155
3
    pub async fn run(self) -> u16 {
156
3
        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
157
3
        let port = listener.local_addr().unwrap().port();
158
3
        info!("Using port {port}");
159
160
3
        background_spawn!("listener", async move {
161
3
            self.dynamic_fake_redis(listener).await;
162
0
        });
163
164
3
        port
165
3
    }
166
}