Coverage Report

Created: 2026-03-06 12:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-redis-tester/src/read_only_redis.rs
Line
Count
Source
1
// Copyright 2026 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::Write;
16
use core::sync::atomic::{AtomicBool, Ordering};
17
use std::sync::Arc;
18
19
use either::Either;
20
use nativelink_util::background_spawn;
21
use redis::Value;
22
use redis_protocol::resp2::decode::decode;
23
use redis_protocol::resp2::types::OwnedFrame;
24
use tokio::net::TcpListener;
25
use tracing::info;
26
27
use crate::fake_redis::{arg_as_string, fake_redis_internal};
28
29
const FAKE_SCRIPT_SHA: &str = "b22b9926cbce9dd9ba97fa7ba3626f89feea1ed5";
30
31
#[derive(Clone, Debug)]
32
pub struct ReadOnlyRedis {
33
    // The first time we hit SETRANGE/HMSET, we output a ReadOnly. Next time, we assume we're reconnected and do correct values
34
    readonly_triggered: Arc<AtomicBool>,
35
}
36
37
impl Default for ReadOnlyRedis {
38
0
    fn default() -> Self {
39
0
        Self::new()
40
0
    }
41
}
42
43
impl ReadOnlyRedis {
44
3
    pub fn new() -> Self {
45
3
        Self {
46
3
            readonly_triggered: Arc::new(AtomicBool::new(false)),
47
3
        }
48
3
    }
49
50
3
    async fn dynamic_fake_redis(self, listener: TcpListener) {
51
3
        let readonly_err_str = "READONLY You can't write against a read only replica.";
52
3
        let readonly_err = format!("!{}\r\n{readonly_err_str}\r\n", readonly_err_str.len());
53
54
27
        let 
inner3
= move |buf: &[u8]| -> String {
55
27
            let mut output = String::new();
56
27
            let mut buf_index = 0;
57
            loop {
58
47
                let frame = match decode(&buf[buf_index..]).unwrap() {
59
47
                    Some((frame, amt)) => {
60
47
                        buf_index += amt;
61
47
                        frame
62
                    }
63
                    None => {
64
0
                        panic!("No frame!");
65
                    }
66
                };
67
47
                let (cmd, args) = {
68
47
                    if let OwnedFrame::Array(a) = frame {
  Branch (68:28): [Folded - Ignored]
  Branch (68:28): [Folded - Ignored]
  Branch (68:28): [True: 47, False: 0]
69
47
                        if let OwnedFrame::BulkString(s) = a.first().unwrap() {
  Branch (69:32): [Folded - Ignored]
  Branch (69:32): [Folded - Ignored]
  Branch (69:32): [True: 47, False: 0]
70
47
                            let args: Vec<_> = a[1..].to_vec();
71
47
                            (str::from_utf8(s).unwrap().to_string(), args)
72
                        } else {
73
0
                            panic!("Array not starting with cmd: {a:?}");
74
                        }
75
                    } else {
76
0
                        panic!("Non array cmd: {frame:?}");
77
                    }
78
                };
79
80
47
                let ret: Either<Value, String> = match cmd.as_str() {
81
47
                    "HELLO" => 
Either::Left(Value::Map(10
vec!10
[(
82
10
                        Value::SimpleString("server".into()),
83
10
                        Value::SimpleString("redis".into()),
84
10
                    )])),
85
37
                    "CLIENT" => {
86
                        // We can safely ignore these, as it's just setting the library name/version
87
20
                        Either::Left(Value::Int(0))
88
                    }
89
17
                    "SCRIPT" => {
90
5
                        assert_eq!(args[0], OwnedFrame::BulkString(b"LOAD".to_vec()));
91
92
5
                        let OwnedFrame::BulkString(ref _script) = args[1] else {
  Branch (92:29): [Folded - Ignored]
  Branch (92:29): [Folded - Ignored]
  Branch (92:29): [True: 5, False: 0]
93
0
                            panic!("Script should be a bulkstring: {args:?}");
94
                        };
95
5
                        Either::Left(Value::SimpleString(FAKE_SCRIPT_SHA.to_string()))
96
                    }
97
12
                    "ROLE" => 
Either::Left(Value::Array(5
vec!5
[
98
5
                        Value::BulkString(b"master".to_vec()),
99
5
                        Value::Int(0),
100
5
                        Value::Array(vec![]),
101
5
                    ])),
102
7
                    "SETRANGE" => {
103
2
                        let value = self.readonly_triggered.load(Ordering::Relaxed);
104
2
                        if value {
  Branch (104:28): [Folded - Ignored]
  Branch (104:28): [Folded - Ignored]
  Branch (104:28): [True: 1, False: 1]
105
1
                            Either::Left(Value::Int(5))
106
                        } else {
107
1
                            self.readonly_triggered.store(true, Ordering::Relaxed);
108
1
                            Either::Right(readonly_err.clone())
109
                        }
110
                    }
111
5
                    "STRLEN" => 
Either::Left(Value::Int(5))1
,
112
4
                    "RENAME" | 
"HMSET"3
=> {
113
3
                        let value = self.readonly_triggered.load(Ordering::Relaxed);
114
3
                        if value {
  Branch (114:28): [Folded - Ignored]
  Branch (114:28): [Folded - Ignored]
  Branch (114:28): [True: 2, False: 1]
115
2
                            Either::Left(Value::Okay)
116
                        } else {
117
1
                            self.readonly_triggered.store(true, Ordering::Relaxed);
118
1
                            Either::Right(readonly_err.clone())
119
                        }
120
                    }
121
1
                    "EVALSHA" => Either::Left(Value::Array(vec![Value::Int(1), Value::Int(0)])),
122
0
                    actual => {
123
0
                        panic!("Mock command not implemented! {actual:?}");
124
                    }
125
                };
126
127
47
                match ret {
128
45
                    Either::Left(v) => {
129
45
                        arg_as_string(&mut output, v);
130
45
                    }
131
2
                    Either::Right(s) => {
132
2
                        write!(&mut output, "{s}").unwrap();
133
2
                    }
134
                }
135
136
47
                if buf_index == buf.len() {
  Branch (136:20): [Folded - Ignored]
  Branch (136:20): [Folded - Ignored]
  Branch (136:20): [True: 27, False: 20]
137
27
                    break;
138
20
                }
139
            }
140
27
            output
141
27
        };
142
3
        fake_redis_internal(listener, vec![inner]).await;
143
0
    }
144
145
3
    pub async fn run(self) -> u16 {
146
3
        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
147
3
        let port = listener.local_addr().unwrap().port();
148
3
        info!("Using port {port}");
149
150
3
        background_spawn!("listener", async move {
151
3
            self.dynamic_fake_redis(listener).await;
152
0
        });
153
154
3
        port
155
3
    }
156
}