Coverage Report

Created: 2026-03-05 11:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/redis_utils/ft_aggregate.rs
Line
Count
Source
1
// Copyright 2024-2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::Debug;
16
17
use futures::Stream;
18
use nativelink_error::Error;
19
use redis::aio::ConnectionLike;
20
use redis::{Arg, ErrorKind, RedisError, Value};
21
use tracing::error;
22
23
use crate::redis_utils::aggregate_types::RedisCursorData;
24
use crate::redis_utils::ft_cursor_read::ft_cursor_read;
25
26
#[derive(Debug)]
27
pub(crate) struct FtAggregateCursor {
28
    pub count: u64,
29
    pub max_idle: u64,
30
}
31
32
#[derive(Debug)]
33
pub(crate) struct FtAggregateOptions {
34
    pub load: Vec<String>,
35
    pub cursor: FtAggregateCursor,
36
    pub sort_by: Vec<String>,
37
}
38
39
/// Calls `FT.AGGREGATE` in redis. redis-rs does not properly support this command
40
/// so we have to manually handle it.
41
25
pub(crate) async fn ft_aggregate<C>(
42
25
    mut connection_manager: C,
43
25
    index: String,
44
25
    query: String,
45
25
    options: FtAggregateOptions,
46
25
) -> Result<impl Stream<Item = Result<Value, RedisError>> + Send, Error>
47
25
where
48
25
    C: ConnectionLike + Send,
49
25
{
50
    struct State<C: ConnectionLike> {
51
        connection_manager: C,
52
        index: String,
53
        data: RedisCursorData,
54
    }
55
56
25
    let mut cmd = redis::cmd("FT.AGGREGATE");
57
25
    let mut ft_aggregate_cmd = cmd
58
25
        .arg(&index)
59
25
        .arg(&query)
60
25
        .arg("LOAD")
61
25
        .arg(options.load.len())
62
25
        .arg(&options.load)
63
25
        .arg("WITHCURSOR")
64
25
        .arg("COUNT")
65
25
        .arg(options.cursor.count)
66
25
        .arg("MAXIDLE")
67
25
        .arg(options.cursor.max_idle)
68
25
        .arg("SORTBY")
69
25
        .arg(options.sort_by.len() * 2);
70
47
    for 
key22
in &options.sort_by {
71
22
        ft_aggregate_cmd = ft_aggregate_cmd.arg(key).arg("ASC");
72
22
    }
73
25
    let res = ft_aggregate_cmd
74
25
        .query_async::<Value>(&mut connection_manager)
75
25
        .await;
76
25
    let 
data23
= match res {
77
23
        Ok(d) => d,
78
2
        Err(e) => {
79
2
            let all_args: Vec<_> = ft_aggregate_cmd
80
2
                .args_iter()
81
32
                .
map2
(|a| match a {
82
32
                    Arg::Simple(bytes) => match str::from_utf8(bytes) {
83
32
                        Ok(s) => s.to_string(),
84
0
                        Err(_) => format!("{bytes:?}"),
85
                    },
86
0
                    other => {
87
0
                        format!("{other:?}")
88
                    }
89
32
                })
90
2
                .collect();
91
2
            error!(
92
                ?e,
93
                index,
94
                ?query,
95
                ?options,
96
                ?all_args,
97
2
                "Error calling ft.aggregate"
98
            );
99
2
            return Err(e.into());
100
        }
101
    };
102
103
23
    let state = State {
104
23
        connection_manager,
105
23
        index,
106
23
        data: data.try_into()
?0
,
107
    };
108
109
23
    Ok(futures::stream::unfold(
110
23
        Some(state),
111
34
        move |maybe_state| async move {
112
34
            let mut state = maybe_state
?0
;
113
            loop {
114
34
                if let Some(
map13
) = state.data.data.pop_front() {
  Branch (114:24): [True: 0, False: 0]
  Branch (114:24): [Folded - Ignored]
  Branch (114:24): [Folded - Ignored]
  Branch (114:24): [True: 10, False: 18]
  Branch (114:24): [True: 3, False: 3]
115
13
                    return Some((Ok(map), Some(state)));
116
21
                }
117
21
                if state.data.cursor == 0 {
  Branch (117:20): [True: 0, False: 0]
  Branch (117:20): [Folded - Ignored]
  Branch (117:20): [Folded - Ignored]
  Branch (117:20): [True: 18, False: 0]
  Branch (117:20): [True: 3, False: 0]
118
21
                    return None;
119
0
                }
120
0
                let data_res = ft_cursor_read(
121
0
                    &mut state.connection_manager,
122
0
                    state.index.clone(),
123
0
                    state.data.cursor,
124
0
                )
125
0
                .await;
126
0
                state.data = match data_res {
127
0
                    Ok(data) => data,
128
0
                    Err(err) => return Some((Err(err), None)),
129
                };
130
            }
131
68
        },
132
    ))
133
25
}
134
135
22
fn resp2_data_parse(
136
22
    output: &mut RedisCursorData,
137
22
    results_array: &[Value],
138
22
) -> Result<(), RedisError> {
139
22
    let mut results_iter = results_array.iter();
140
22
    match results_iter.next() {
141
22
        Some(Value::Int(t)) => {
142
22
            output.total = *t;
143
22
        }
144
0
        Some(other) => {
145
0
            error!(?other, "Non-int for first value in ft.aggregate");
146
0
            return Err(RedisError::from((
147
0
                ErrorKind::Parse,
148
0
                "Non int for aggregate total",
149
0
                format!("{other:?}"),
150
0
            )));
151
        }
152
        None => {
153
0
            error!("No items in results array for ft.aggregate!");
154
0
            return Err(RedisError::from((
155
0
                ErrorKind::Parse,
156
0
                "No items in results array for ft.aggregate",
157
0
            )));
158
        }
159
    }
160
161
34
    for 
item12
in results_iter {
162
12
        match item {
163
12
            Value::Array(items) if items.len() % 2 == 0 => {}
  Branch (163:36): [True: 12, False: 0]
  Branch (163:36): [Folded - Ignored]
164
0
            other => {
165
0
                error!(
166
                    ?other,
167
0
                    "Expected an array with an even number of items, didn't get it for aggregate value"
168
                );
169
0
                return Err(RedisError::from((
170
0
                    ErrorKind::Parse,
171
0
                    "Expected an array with an even number of items, didn't get it for aggregate value",
172
0
                    format!("{other:?}"),
173
0
                )));
174
            }
175
        }
176
177
12
        output.data.push_back(item.clone());
178
    }
179
22
    Ok(())
180
22
}
181
182
1
fn resp3_data_parse(
183
1
    output: &mut RedisCursorData,
184
1
    results_map: &Vec<(Value, Value)>,
185
1
) -> Result<(), RedisError> {
186
6
    for (
raw_key5
,
value5
) in results_map {
187
5
        let Value::SimpleString(key) = raw_key else {
  Branch (187:13): [True: 5, False: 0]
  Branch (187:13): [Folded - Ignored]
188
0
            return Err(RedisError::from((
189
0
                ErrorKind::Parse,
190
0
                "Expected SimpleString keys",
191
0
                format!("{raw_key:?}"),
192
0
            )));
193
        };
194
5
        match key.as_str() {
195
5
            "attributes" => {
196
1
                let Value::Array(attributes) = value else {
  Branch (196:21): [True: 1, False: 0]
  Branch (196:21): [Folded - Ignored]
197
0
                    return Err(RedisError::from((
198
0
                        ErrorKind::Parse,
199
0
                        "Expected array for attributes",
200
0
                        format!("{value:?}"),
201
0
                    )));
202
                };
203
1
                if !attributes.is_empty() {
  Branch (203:20): [True: 0, False: 1]
  Branch (203:20): [Folded - Ignored]
204
0
                    return Err(RedisError::from((
205
0
                        ErrorKind::Parse,
206
0
                        "Expected empty attributes",
207
0
                        format!("{attributes:?}"),
208
0
                    )));
209
1
                }
210
            }
211
4
            "format" => {
212
1
                let Value::SimpleString(format) = value else {
  Branch (212:21): [True: 1, False: 0]
  Branch (212:21): [Folded - Ignored]
213
0
                    return Err(RedisError::from((
214
0
                        ErrorKind::Parse,
215
0
                        "Expected SimpleString for format",
216
0
                        format!("{value:?}"),
217
0
                    )));
218
                };
219
1
                if format.as_str() != "STRING" {
  Branch (219:20): [True: 0, False: 1]
  Branch (219:20): [Folded - Ignored]
220
0
                    return Err(RedisError::from((
221
0
                        ErrorKind::Parse,
222
0
                        "Expected STRING format",
223
0
                        format.to_string(),
224
0
                    )));
225
1
                }
226
            }
227
3
            "results" => {
228
1
                let Value::Array(values) = value else {
  Branch (228:21): [True: 1, False: 0]
  Branch (228:21): [Folded - Ignored]
229
0
                    return Err(RedisError::from((
230
0
                        ErrorKind::Parse,
231
0
                        "Expected Array for results",
232
0
                        format!("{value:?}"),
233
0
                    )));
234
                };
235
2
                for 
raw_value1
in values {
236
1
                    let Value::Map(value) = raw_value else {
  Branch (236:25): [True: 1, False: 0]
  Branch (236:25): [Folded - Ignored]
237
0
                        return Err(RedisError::from((
238
0
                            ErrorKind::Parse,
239
0
                            "Expected list of maps in result",
240
0
                            format!("{raw_value:?}"),
241
0
                        )));
242
                    };
243
3
                    for (
raw_map_key2
,
raw_map_value2
) in value {
244
2
                        let Value::SimpleString(map_key) = raw_map_key else {
  Branch (244:29): [True: 2, False: 0]
  Branch (244:29): [Folded - Ignored]
245
0
                            return Err(RedisError::from((
246
0
                                ErrorKind::Parse,
247
0
                                "Expected SimpleString keys for result maps",
248
0
                                format!("{raw_key:?}"),
249
0
                            )));
250
                        };
251
2
                        match map_key.as_str() {
252
2
                            "extra_attributes" => {
253
1
                                let Value::Map(extra_attributes_values) = raw_map_value else {
  Branch (253:37): [True: 1, False: 0]
  Branch (253:37): [Folded - Ignored]
254
0
                                    return Err(RedisError::from((
255
0
                                        ErrorKind::Parse,
256
0
                                        "Expected Map for extra_attributes",
257
0
                                        format!("{raw_map_value:?}"),
258
0
                                    )));
259
                                };
260
1
                                let mut output_array = vec![];
261
3
                                for (
e_key2
,
e_value2
) in extra_attributes_values {
262
2
                                    output_array.push(e_key.clone());
263
2
                                    output_array.push(e_value.clone());
264
2
                                }
265
1
                                output.data.push_back(Value::Array(output_array));
266
                            }
267
1
                            "values" => {
268
1
                                let Value::Array(values_values) = raw_map_value else {
  Branch (268:37): [True: 1, False: 0]
  Branch (268:37): [Folded - Ignored]
269
0
                                    return Err(RedisError::from((
270
0
                                        ErrorKind::Parse,
271
0
                                        "Expected Array for values",
272
0
                                        format!("{raw_map_value:?}"),
273
0
                                    )));
274
                                };
275
1
                                if !values_values.is_empty() {
  Branch (275:36): [True: 0, False: 1]
  Branch (275:36): [Folded - Ignored]
276
0
                                    return Err(RedisError::from((
277
0
                                        ErrorKind::Parse,
278
0
                                        "Expected empty values (all in extra_attributes)",
279
0
                                        format!("{values_values:?}"),
280
0
                                    )));
281
1
                                }
282
                            }
283
                            _ => {
284
0
                                return Err(RedisError::from((
285
0
                                    ErrorKind::Parse,
286
0
                                    "Unknown result map key",
287
0
                                    format!("{map_key:?}"),
288
0
                                )));
289
                            }
290
                        }
291
                    }
292
                }
293
            }
294
2
            "total_results" => {
295
1
                let Value::Int(total) = value else {
  Branch (295:21): [True: 1, False: 0]
  Branch (295:21): [Folded - Ignored]
296
0
                    return Err(RedisError::from((
297
0
                        ErrorKind::Parse,
298
0
                        "Expected int for total_results",
299
0
                        format!("{value:?}"),
300
0
                    )));
301
                };
302
1
                output.total = *total;
303
            }
304
1
            "warning" => {
305
1
                let Value::Array(warnings) = value else {
  Branch (305:21): [True: 1, False: 0]
  Branch (305:21): [Folded - Ignored]
306
0
                    return Err(RedisError::from((
307
0
                        ErrorKind::Parse,
308
0
                        "Expected Array for warning",
309
0
                        format!("{value:?}"),
310
0
                    )));
311
                };
312
1
                if !warnings.is_empty() {
  Branch (312:20): [True: 0, False: 1]
  Branch (312:20): [Folded - Ignored]
313
0
                    return Err(RedisError::from((
314
0
                        ErrorKind::Parse,
315
0
                        "Expected empty warnings",
316
0
                        format!("{warnings:?}"),
317
0
                    )));
318
1
                }
319
            }
320
            _ => {
321
0
                return Err(RedisError::from((
322
0
                    ErrorKind::Parse,
323
0
                    "Unexpected key in ft.aggregate",
324
0
                    format!("{key} => {value:?}"),
325
0
                )));
326
            }
327
        }
328
    }
329
1
    Ok(())
330
1
}
331
332
impl TryFrom<Value> for RedisCursorData {
333
    type Error = RedisError;
334
23
    fn try_from(raw_value: Value) -> Result<Self, RedisError> {
335
23
        let Value::Array(value) = raw_value else {
  Branch (335:13): [True: 23, False: 0]
  Branch (335:13): [Folded - Ignored]
336
0
            error!(
337
                ?raw_value,
338
0
                "Bad data in ft.aggregate, expected array at top-level"
339
            );
340
0
            return Err(RedisError::from((ErrorKind::Parse, "Expected array")));
341
        };
342
23
        if value.len() < 2 {
  Branch (342:12): [True: 0, False: 23]
  Branch (342:12): [Folded - Ignored]
343
0
            return Err(RedisError::from((
344
0
                ErrorKind::Parse,
345
0
                "Expected at least 2 elements",
346
0
            )));
347
23
        }
348
23
        let mut output = Self::default();
349
23
        let mut value = value.into_iter();
350
23
        match value.next().unwrap() {
351
22
            Value::Array(d) => resp2_data_parse(&mut output, &d)
?0
,
352
1
            Value::Map(d) => resp3_data_parse(&mut output, &d)
?0
,
353
0
            other => {
354
0
                error!(
355
                    ?other,
356
0
                    "Bad data in ft.aggregate, expected array for results"
357
                );
358
0
                return Err(RedisError::from((
359
0
                    ErrorKind::Parse,
360
0
                    "Non map item",
361
0
                    format!("{other:?}"),
362
0
                )));
363
            }
364
        }
365
23
        let Value::Int(cursor) = value.next().unwrap() else {
  Branch (365:13): [True: 23, False: 0]
  Branch (365:13): [Folded - Ignored]
366
0
            return Err(RedisError::from((
367
0
                ErrorKind::Parse,
368
0
                "Expected integer as last element",
369
0
            )));
370
        };
371
23
        output.cursor = cursor as u64;
372
23
        Ok(output)
373
23
    }
374
}