/build/source/nativelink-redis-tester/src/read_only_redis.rs
Line | Count | Source |
1 | | // Copyright 2026 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::fmt::Write; |
16 | | use core::sync::atomic::{AtomicBool, Ordering}; |
17 | | use std::sync::Arc; |
18 | | |
19 | | use either::Either; |
20 | | use nativelink_util::background_spawn; |
21 | | use redis::Value; |
22 | | use redis_protocol::resp2::decode::decode; |
23 | | use redis_protocol::resp2::types::OwnedFrame; |
24 | | use tokio::net::TcpListener; |
25 | | use tracing::info; |
26 | | |
27 | | use crate::fake_redis::{arg_as_string, fake_redis_internal}; |
28 | | |
29 | | const FAKE_SCRIPT_SHA: &str = "b22b9926cbce9dd9ba97fa7ba3626f89feea1ed5"; |
30 | | |
31 | | #[derive(Clone, Debug)] |
32 | | pub struct ReadOnlyRedis { |
33 | | // The first time we hit SETRANGE/HMSET, we output a ReadOnly. Next time, we assume we're reconnected and do correct values |
34 | | readonly_triggered: Arc<AtomicBool>, |
35 | | } |
36 | | |
37 | | impl Default for ReadOnlyRedis { |
38 | 0 | fn default() -> Self { |
39 | 0 | Self::new() |
40 | 0 | } |
41 | | } |
42 | | |
43 | | impl ReadOnlyRedis { |
44 | 3 | pub fn new() -> Self { |
45 | 3 | Self { |
46 | 3 | readonly_triggered: Arc::new(AtomicBool::new(false)), |
47 | 3 | } |
48 | 3 | } |
49 | | |
50 | 3 | async fn dynamic_fake_redis(self, listener: TcpListener) { |
51 | 3 | let readonly_err_str = "READONLY You can't write against a read only replica."; |
52 | 3 | let readonly_err = format!("!{}\r\n{readonly_err_str}\r\n", readonly_err_str.len()); |
53 | | |
54 | 27 | let inner3 = move |buf: &[u8]| -> String { |
55 | 27 | let mut output = String::new(); |
56 | 27 | let mut buf_index = 0; |
57 | | loop { |
58 | 47 | let frame = match decode(&buf[buf_index..]).unwrap() { |
59 | 47 | Some((frame, amt)) => { |
60 | 47 | buf_index += amt; |
61 | 47 | frame |
62 | | } |
63 | | None => { |
64 | 0 | panic!("No frame!"); |
65 | | } |
66 | | }; |
67 | 47 | let (cmd, args) = { |
68 | 47 | if let OwnedFrame::Array(a) = frame { Branch (68:28): [Folded - Ignored]
Branch (68:28): [Folded - Ignored]
Branch (68:28): [True: 47, False: 0]
|
69 | 47 | if let OwnedFrame::BulkString(s) = a.first().unwrap() { Branch (69:32): [Folded - Ignored]
Branch (69:32): [Folded - Ignored]
Branch (69:32): [True: 47, False: 0]
|
70 | 47 | let args: Vec<_> = a[1..].to_vec(); |
71 | 47 | (str::from_utf8(s).unwrap().to_string(), args) |
72 | | } else { |
73 | 0 | panic!("Array not starting with cmd: {a:?}"); |
74 | | } |
75 | | } else { |
76 | 0 | panic!("Non array cmd: {frame:?}"); |
77 | | } |
78 | | }; |
79 | | |
80 | 47 | let ret: Either<Value, String> = match cmd.as_str() { |
81 | 47 | "HELLO" => Either::Left(Value::Map(10 vec!10 [( |
82 | 10 | Value::SimpleString("server".into()), |
83 | 10 | Value::SimpleString("redis".into()), |
84 | 10 | )])), |
85 | 37 | "CLIENT" => { |
86 | | // We can safely ignore these, as it's just setting the library name/version |
87 | 20 | Either::Left(Value::Int(0)) |
88 | | } |
89 | 17 | "SCRIPT" => { |
90 | 5 | assert_eq!(args[0], OwnedFrame::BulkString(b"LOAD".to_vec())); |
91 | | |
92 | 5 | let OwnedFrame::BulkString(ref _script) = args[1] else { Branch (92:29): [Folded - Ignored]
Branch (92:29): [Folded - Ignored]
Branch (92:29): [True: 5, False: 0]
|
93 | 0 | panic!("Script should be a bulkstring: {args:?}"); |
94 | | }; |
95 | 5 | Either::Left(Value::SimpleString(FAKE_SCRIPT_SHA.to_string())) |
96 | | } |
97 | 12 | "ROLE" => Either::Left(Value::Array(5 vec!5 [ |
98 | 5 | Value::BulkString(b"master".to_vec()), |
99 | 5 | Value::Int(0), |
100 | 5 | Value::Array(vec![]), |
101 | 5 | ])), |
102 | 7 | "SETRANGE" => { |
103 | 2 | let value = self.readonly_triggered.load(Ordering::Relaxed); |
104 | 2 | if value { Branch (104:28): [Folded - Ignored]
Branch (104:28): [Folded - Ignored]
Branch (104:28): [True: 1, False: 1]
|
105 | 1 | Either::Left(Value::Int(5)) |
106 | | } else { |
107 | 1 | self.readonly_triggered.store(true, Ordering::Relaxed); |
108 | 1 | Either::Right(readonly_err.clone()) |
109 | | } |
110 | | } |
111 | 5 | "STRLEN" => Either::Left(Value::Int(5))1 , |
112 | 4 | "RENAME" | "HMSET"3 => { |
113 | 3 | let value = self.readonly_triggered.load(Ordering::Relaxed); |
114 | 3 | if value { Branch (114:28): [Folded - Ignored]
Branch (114:28): [Folded - Ignored]
Branch (114:28): [True: 2, False: 1]
|
115 | 2 | Either::Left(Value::Okay) |
116 | | } else { |
117 | 1 | self.readonly_triggered.store(true, Ordering::Relaxed); |
118 | 1 | Either::Right(readonly_err.clone()) |
119 | | } |
120 | | } |
121 | 1 | "EVALSHA" => Either::Left(Value::Array(vec![Value::Int(1), Value::Int(0)])), |
122 | 0 | actual => { |
123 | 0 | panic!("Mock command not implemented! {actual:?}"); |
124 | | } |
125 | | }; |
126 | | |
127 | 47 | match ret { |
128 | 45 | Either::Left(v) => { |
129 | 45 | arg_as_string(&mut output, v); |
130 | 45 | } |
131 | 2 | Either::Right(s) => { |
132 | 2 | write!(&mut output, "{s}").unwrap(); |
133 | 2 | } |
134 | | } |
135 | | |
136 | 47 | if buf_index == buf.len() { Branch (136:20): [Folded - Ignored]
Branch (136:20): [Folded - Ignored]
Branch (136:20): [True: 27, False: 20]
|
137 | 27 | break; |
138 | 20 | } |
139 | | } |
140 | 27 | output |
141 | 27 | }; |
142 | 3 | fake_redis_internal(listener, vec![inner]).await; |
143 | 0 | } |
144 | | |
145 | 3 | pub async fn run(self) -> u16 { |
146 | 3 | let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); |
147 | 3 | let port = listener.local_addr().unwrap().port(); |
148 | 3 | info!("Using port {port}"); |
149 | | |
150 | 3 | background_spawn!("listener", async move { |
151 | 3 | self.dynamic_fake_redis(listener).await; |
152 | 0 | }); |
153 | | |
154 | 3 | port |
155 | 3 | } |
156 | | } |