/build/source/nativelink-store/src/redis_utils/ft_aggregate.rs
Line | Count | Source |
1 | | // Copyright 2024-2025 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::fmt::Debug; |
16 | | |
17 | | use futures::Stream; |
18 | | use nativelink_error::Error; |
19 | | use redis::aio::ConnectionLike; |
20 | | use redis::{Arg, ErrorKind, RedisError, Value}; |
21 | | use tracing::error; |
22 | | |
23 | | use crate::redis_utils::aggregate_types::RedisCursorData; |
24 | | use crate::redis_utils::ft_cursor_read::ft_cursor_read; |
25 | | |
26 | | #[derive(Debug)] |
27 | | pub(crate) struct FtAggregateCursor { |
28 | | pub count: u64, |
29 | | pub max_idle: u64, |
30 | | } |
31 | | |
32 | | #[derive(Debug)] |
33 | | pub(crate) struct FtAggregateOptions { |
34 | | pub load: Vec<String>, |
35 | | pub cursor: FtAggregateCursor, |
36 | | pub sort_by: Vec<String>, |
37 | | } |
38 | | |
39 | | /// Calls `FT.AGGREGATE` in redis. redis-rs does not properly support this command |
40 | | /// so we have to manually handle it. |
41 | 25 | pub(crate) async fn ft_aggregate<C>( |
42 | 25 | mut connection_manager: C, |
43 | 25 | index: String, |
44 | 25 | query: String, |
45 | 25 | options: FtAggregateOptions, |
46 | 25 | ) -> Result<impl Stream<Item = Result<Value, RedisError>> + Send, Error> |
47 | 25 | where |
48 | 25 | C: ConnectionLike + Send, |
49 | 25 | { |
50 | | struct State<C: ConnectionLike> { |
51 | | connection_manager: C, |
52 | | index: String, |
53 | | data: RedisCursorData, |
54 | | } |
55 | | |
56 | 25 | let mut cmd = redis::cmd("FT.AGGREGATE"); |
57 | 25 | let mut ft_aggregate_cmd = cmd |
58 | 25 | .arg(&index) |
59 | 25 | .arg(&query) |
60 | 25 | .arg("LOAD") |
61 | 25 | .arg(options.load.len()) |
62 | 25 | .arg(&options.load) |
63 | 25 | .arg("WITHCURSOR") |
64 | 25 | .arg("COUNT") |
65 | 25 | .arg(options.cursor.count) |
66 | 25 | .arg("MAXIDLE") |
67 | 25 | .arg(options.cursor.max_idle) |
68 | 25 | .arg("SORTBY") |
69 | 25 | .arg(options.sort_by.len() * 2); |
70 | 47 | for key22 in &options.sort_by { |
71 | 22 | ft_aggregate_cmd = ft_aggregate_cmd.arg(key).arg("ASC"); |
72 | 22 | } |
73 | 25 | let res = ft_aggregate_cmd |
74 | 25 | .query_async::<Value>(&mut connection_manager) |
75 | 25 | .await; |
76 | 25 | let data23 = match res { |
77 | 23 | Ok(d) => d, |
78 | 2 | Err(e) => { |
79 | 2 | let all_args: Vec<_> = ft_aggregate_cmd |
80 | 2 | .args_iter() |
81 | 32 | .map2 (|a| match a { |
82 | 32 | Arg::Simple(bytes) => match str::from_utf8(bytes) { |
83 | 32 | Ok(s) => s.to_string(), |
84 | 0 | Err(_) => format!("{bytes:?}"), |
85 | | }, |
86 | 0 | other => { |
87 | 0 | format!("{other:?}") |
88 | | } |
89 | 32 | }) |
90 | 2 | .collect(); |
91 | 2 | error!( |
92 | | ?e, |
93 | | index, |
94 | | ?query, |
95 | | ?options, |
96 | | ?all_args, |
97 | 2 | "Error calling ft.aggregate" |
98 | | ); |
99 | 2 | return Err(e.into()); |
100 | | } |
101 | | }; |
102 | | |
103 | 23 | let state = State { |
104 | 23 | connection_manager, |
105 | 23 | index, |
106 | 23 | data: data.try_into()?0 , |
107 | | }; |
108 | | |
109 | 23 | Ok(futures::stream::unfold( |
110 | 23 | Some(state), |
111 | 34 | move |maybe_state| async move { |
112 | 34 | let mut state = maybe_state?0 ; |
113 | | loop { |
114 | 34 | if let Some(map13 ) = state.data.data.pop_front() { Branch (114:24): [True: 0, False: 0]
Branch (114:24): [Folded - Ignored]
Branch (114:24): [Folded - Ignored]
Branch (114:24): [True: 10, False: 18]
Branch (114:24): [True: 3, False: 3]
|
115 | 13 | return Some((Ok(map), Some(state))); |
116 | 21 | } |
117 | 21 | if state.data.cursor == 0 { Branch (117:20): [True: 0, False: 0]
Branch (117:20): [Folded - Ignored]
Branch (117:20): [Folded - Ignored]
Branch (117:20): [True: 18, False: 0]
Branch (117:20): [True: 3, False: 0]
|
118 | 21 | return None; |
119 | 0 | } |
120 | 0 | let data_res = ft_cursor_read( |
121 | 0 | &mut state.connection_manager, |
122 | 0 | state.index.clone(), |
123 | 0 | state.data.cursor, |
124 | 0 | ) |
125 | 0 | .await; |
126 | 0 | state.data = match data_res { |
127 | 0 | Ok(data) => data, |
128 | 0 | Err(err) => return Some((Err(err), None)), |
129 | | }; |
130 | | } |
131 | 68 | }, |
132 | | )) |
133 | 25 | } |
134 | | |
135 | 22 | fn resp2_data_parse( |
136 | 22 | output: &mut RedisCursorData, |
137 | 22 | results_array: &[Value], |
138 | 22 | ) -> Result<(), RedisError> { |
139 | 22 | let mut results_iter = results_array.iter(); |
140 | 22 | match results_iter.next() { |
141 | 22 | Some(Value::Int(t)) => { |
142 | 22 | output.total = *t; |
143 | 22 | } |
144 | 0 | Some(other) => { |
145 | 0 | error!(?other, "Non-int for first value in ft.aggregate"); |
146 | 0 | return Err(RedisError::from(( |
147 | 0 | ErrorKind::Parse, |
148 | 0 | "Non int for aggregate total", |
149 | 0 | format!("{other:?}"), |
150 | 0 | ))); |
151 | | } |
152 | | None => { |
153 | 0 | error!("No items in results array for ft.aggregate!"); |
154 | 0 | return Err(RedisError::from(( |
155 | 0 | ErrorKind::Parse, |
156 | 0 | "No items in results array for ft.aggregate", |
157 | 0 | ))); |
158 | | } |
159 | | } |
160 | | |
161 | 34 | for item12 in results_iter { |
162 | 12 | match item { |
163 | 12 | Value::Array(items) if items.len() % 2 == 0 => {} Branch (163:36): [True: 12, False: 0]
Branch (163:36): [Folded - Ignored]
|
164 | 0 | other => { |
165 | 0 | error!( |
166 | | ?other, |
167 | 0 | "Expected an array with an even number of items, didn't get it for aggregate value" |
168 | | ); |
169 | 0 | return Err(RedisError::from(( |
170 | 0 | ErrorKind::Parse, |
171 | 0 | "Expected an array with an even number of items, didn't get it for aggregate value", |
172 | 0 | format!("{other:?}"), |
173 | 0 | ))); |
174 | | } |
175 | | } |
176 | | |
177 | 12 | output.data.push_back(item.clone()); |
178 | | } |
179 | 22 | Ok(()) |
180 | 22 | } |
181 | | |
182 | 1 | fn resp3_data_parse( |
183 | 1 | output: &mut RedisCursorData, |
184 | 1 | results_map: &Vec<(Value, Value)>, |
185 | 1 | ) -> Result<(), RedisError> { |
186 | 6 | for (raw_key5 , value5 ) in results_map { |
187 | 5 | let Value::SimpleString(key) = raw_key else { Branch (187:13): [True: 5, False: 0]
Branch (187:13): [Folded - Ignored]
|
188 | 0 | return Err(RedisError::from(( |
189 | 0 | ErrorKind::Parse, |
190 | 0 | "Expected SimpleString keys", |
191 | 0 | format!("{raw_key:?}"), |
192 | 0 | ))); |
193 | | }; |
194 | 5 | match key.as_str() { |
195 | 5 | "attributes" => { |
196 | 1 | let Value::Array(attributes) = value else { Branch (196:21): [True: 1, False: 0]
Branch (196:21): [Folded - Ignored]
|
197 | 0 | return Err(RedisError::from(( |
198 | 0 | ErrorKind::Parse, |
199 | 0 | "Expected array for attributes", |
200 | 0 | format!("{value:?}"), |
201 | 0 | ))); |
202 | | }; |
203 | 1 | if !attributes.is_empty() { Branch (203:20): [True: 0, False: 1]
Branch (203:20): [Folded - Ignored]
|
204 | 0 | return Err(RedisError::from(( |
205 | 0 | ErrorKind::Parse, |
206 | 0 | "Expected empty attributes", |
207 | 0 | format!("{attributes:?}"), |
208 | 0 | ))); |
209 | 1 | } |
210 | | } |
211 | 4 | "format" => { |
212 | 1 | let Value::SimpleString(format) = value else { Branch (212:21): [True: 1, False: 0]
Branch (212:21): [Folded - Ignored]
|
213 | 0 | return Err(RedisError::from(( |
214 | 0 | ErrorKind::Parse, |
215 | 0 | "Expected SimpleString for format", |
216 | 0 | format!("{value:?}"), |
217 | 0 | ))); |
218 | | }; |
219 | 1 | if format.as_str() != "STRING" { Branch (219:20): [True: 0, False: 1]
Branch (219:20): [Folded - Ignored]
|
220 | 0 | return Err(RedisError::from(( |
221 | 0 | ErrorKind::Parse, |
222 | 0 | "Expected STRING format", |
223 | 0 | format.to_string(), |
224 | 0 | ))); |
225 | 1 | } |
226 | | } |
227 | 3 | "results" => { |
228 | 1 | let Value::Array(values) = value else { Branch (228:21): [True: 1, False: 0]
Branch (228:21): [Folded - Ignored]
|
229 | 0 | return Err(RedisError::from(( |
230 | 0 | ErrorKind::Parse, |
231 | 0 | "Expected Array for results", |
232 | 0 | format!("{value:?}"), |
233 | 0 | ))); |
234 | | }; |
235 | 2 | for raw_value1 in values { |
236 | 1 | let Value::Map(value) = raw_value else { Branch (236:25): [True: 1, False: 0]
Branch (236:25): [Folded - Ignored]
|
237 | 0 | return Err(RedisError::from(( |
238 | 0 | ErrorKind::Parse, |
239 | 0 | "Expected list of maps in result", |
240 | 0 | format!("{raw_value:?}"), |
241 | 0 | ))); |
242 | | }; |
243 | 3 | for (raw_map_key2 , raw_map_value2 ) in value { |
244 | 2 | let Value::SimpleString(map_key) = raw_map_key else { Branch (244:29): [True: 2, False: 0]
Branch (244:29): [Folded - Ignored]
|
245 | 0 | return Err(RedisError::from(( |
246 | 0 | ErrorKind::Parse, |
247 | 0 | "Expected SimpleString keys for result maps", |
248 | 0 | format!("{raw_key:?}"), |
249 | 0 | ))); |
250 | | }; |
251 | 2 | match map_key.as_str() { |
252 | 2 | "extra_attributes" => { |
253 | 1 | let Value::Map(extra_attributes_values) = raw_map_value else { Branch (253:37): [True: 1, False: 0]
Branch (253:37): [Folded - Ignored]
|
254 | 0 | return Err(RedisError::from(( |
255 | 0 | ErrorKind::Parse, |
256 | 0 | "Expected Map for extra_attributes", |
257 | 0 | format!("{raw_map_value:?}"), |
258 | 0 | ))); |
259 | | }; |
260 | 1 | let mut output_array = vec![]; |
261 | 3 | for (e_key2 , e_value2 ) in extra_attributes_values { |
262 | 2 | output_array.push(e_key.clone()); |
263 | 2 | output_array.push(e_value.clone()); |
264 | 2 | } |
265 | 1 | output.data.push_back(Value::Array(output_array)); |
266 | | } |
267 | 1 | "values" => { |
268 | 1 | let Value::Array(values_values) = raw_map_value else { Branch (268:37): [True: 1, False: 0]
Branch (268:37): [Folded - Ignored]
|
269 | 0 | return Err(RedisError::from(( |
270 | 0 | ErrorKind::Parse, |
271 | 0 | "Expected Array for values", |
272 | 0 | format!("{raw_map_value:?}"), |
273 | 0 | ))); |
274 | | }; |
275 | 1 | if !values_values.is_empty() { Branch (275:36): [True: 0, False: 1]
Branch (275:36): [Folded - Ignored]
|
276 | 0 | return Err(RedisError::from(( |
277 | 0 | ErrorKind::Parse, |
278 | 0 | "Expected empty values (all in extra_attributes)", |
279 | 0 | format!("{values_values:?}"), |
280 | 0 | ))); |
281 | 1 | } |
282 | | } |
283 | | _ => { |
284 | 0 | return Err(RedisError::from(( |
285 | 0 | ErrorKind::Parse, |
286 | 0 | "Unknown result map key", |
287 | 0 | format!("{map_key:?}"), |
288 | 0 | ))); |
289 | | } |
290 | | } |
291 | | } |
292 | | } |
293 | | } |
294 | 2 | "total_results" => { |
295 | 1 | let Value::Int(total) = value else { Branch (295:21): [True: 1, False: 0]
Branch (295:21): [Folded - Ignored]
|
296 | 0 | return Err(RedisError::from(( |
297 | 0 | ErrorKind::Parse, |
298 | 0 | "Expected int for total_results", |
299 | 0 | format!("{value:?}"), |
300 | 0 | ))); |
301 | | }; |
302 | 1 | output.total = *total; |
303 | | } |
304 | 1 | "warning" => { |
305 | 1 | let Value::Array(warnings) = value else { Branch (305:21): [True: 1, False: 0]
Branch (305:21): [Folded - Ignored]
|
306 | 0 | return Err(RedisError::from(( |
307 | 0 | ErrorKind::Parse, |
308 | 0 | "Expected Array for warning", |
309 | 0 | format!("{value:?}"), |
310 | 0 | ))); |
311 | | }; |
312 | 1 | if !warnings.is_empty() { Branch (312:20): [True: 0, False: 1]
Branch (312:20): [Folded - Ignored]
|
313 | 0 | return Err(RedisError::from(( |
314 | 0 | ErrorKind::Parse, |
315 | 0 | "Expected empty warnings", |
316 | 0 | format!("{warnings:?}"), |
317 | 0 | ))); |
318 | 1 | } |
319 | | } |
320 | | _ => { |
321 | 0 | return Err(RedisError::from(( |
322 | 0 | ErrorKind::Parse, |
323 | 0 | "Unexpected key in ft.aggregate", |
324 | 0 | format!("{key} => {value:?}"), |
325 | 0 | ))); |
326 | | } |
327 | | } |
328 | | } |
329 | 1 | Ok(()) |
330 | 1 | } |
331 | | |
332 | | impl TryFrom<Value> for RedisCursorData { |
333 | | type Error = RedisError; |
334 | 23 | fn try_from(raw_value: Value) -> Result<Self, RedisError> { |
335 | 23 | let Value::Array(value) = raw_value else { Branch (335:13): [True: 23, False: 0]
Branch (335:13): [Folded - Ignored]
|
336 | 0 | error!( |
337 | | ?raw_value, |
338 | 0 | "Bad data in ft.aggregate, expected array at top-level" |
339 | | ); |
340 | 0 | return Err(RedisError::from((ErrorKind::Parse, "Expected array"))); |
341 | | }; |
342 | 23 | if value.len() < 2 { Branch (342:12): [True: 0, False: 23]
Branch (342:12): [Folded - Ignored]
|
343 | 0 | return Err(RedisError::from(( |
344 | 0 | ErrorKind::Parse, |
345 | 0 | "Expected at least 2 elements", |
346 | 0 | ))); |
347 | 23 | } |
348 | 23 | let mut output = Self::default(); |
349 | 23 | let mut value = value.into_iter(); |
350 | 23 | match value.next().unwrap() { |
351 | 22 | Value::Array(d) => resp2_data_parse(&mut output, &d)?0 , |
352 | 1 | Value::Map(d) => resp3_data_parse(&mut output, &d)?0 , |
353 | 0 | other => { |
354 | 0 | error!( |
355 | | ?other, |
356 | 0 | "Bad data in ft.aggregate, expected array for results" |
357 | | ); |
358 | 0 | return Err(RedisError::from(( |
359 | 0 | ErrorKind::Parse, |
360 | 0 | "Non map item", |
361 | 0 | format!("{other:?}"), |
362 | 0 | ))); |
363 | | } |
364 | | } |
365 | 23 | let Value::Int(cursor) = value.next().unwrap() else { Branch (365:13): [True: 23, False: 0]
Branch (365:13): [Folded - Ignored]
|
366 | 0 | return Err(RedisError::from(( |
367 | 0 | ErrorKind::Parse, |
368 | 0 | "Expected integer as last element", |
369 | 0 | ))); |
370 | | }; |
371 | 23 | output.cursor = cursor as u64; |
372 | 23 | Ok(output) |
373 | 23 | } |
374 | | } |