Coverage Report

Created: 2025-09-16 19:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/redis_utils/ft_aggregate.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::VecDeque;
16
17
use fred::error::{Error as RedisError, ErrorKind as RedisErrorKind};
18
use fred::interfaces::RediSearchInterface;
19
use fred::types::redisearch::FtAggregateOptions;
20
use fred::types::{FromValue, Map as RedisMap, Value as RedisValue};
21
use futures::Stream;
22
23
/// Calls `FT_AGGREGATE` in redis. Fred does not properly support this command
24
/// so we have to manually handle it.
25
22
pub(crate) async fn ft_aggregate<C, I, Q>(
26
22
    client: C,
27
22
    index: I,
28
22
    query: Q,
29
22
    options: FtAggregateOptions,
30
22
) -> Result<impl Stream<Item = Result<RedisMap, RedisError>> + Send, RedisError>
31
22
where
32
22
    C: RediSearchInterface,
33
22
    I: Into<bytes_utils::string::Str>,
34
22
    Q: Into<bytes_utils::string::Str>,
35
22
{
36
    struct State<C: RediSearchInterface> {
37
        client: C,
38
        index: bytes_utils::string::Str,
39
        data: RedisCursorData,
40
    }
41
42
22
    let index = index.into();
43
22
    let query = query.into();
44
22
    let 
data21
:
RedisCursorData21
= client.ft_aggregate(index.clone(), query, options).await
?1
;
45
46
21
    let state = State {
47
21
        client,
48
21
        index,
49
21
        data,
50
21
    };
51
21
    Ok(futures::stream::unfold(
52
21
        Some(state),
53
26
        move |maybe_state| async move {
54
26
            let mut state = maybe_state
?0
;
55
            loop {
56
26
                if let Some(
map7
) = state.data.data.pop_front() {
  Branch (56:24): [True: 0, False: 0]
  Branch (56:24): [Folded - Ignored]
  Branch (56:24): [Folded - Ignored]
  Branch (56:24): [True: 7, False: 19]
57
7
                    return Some((Ok(map), Some(state)));
58
19
                }
59
19
                if state.data.cursor == 0 {
  Branch (59:20): [True: 0, False: 0]
  Branch (59:20): [Folded - Ignored]
  Branch (59:20): [Folded - Ignored]
  Branch (59:20): [True: 19, False: 0]
60
19
                    return None;
61
0
                }
62
0
                let data_res = state
63
0
                    .client
64
0
                    .ft_cursor_read(state.index.clone(), state.data.cursor, None)
65
0
                    .await;
66
0
                state.data = match data_res {
67
0
                    Ok(data) => data,
68
0
                    Err(err) => return Some((Err(err), None)),
69
                };
70
            }
71
52
        },
72
    ))
73
22
}
74
75
#[derive(Debug, Default)]
76
struct RedisCursorData {
77
    total: u64,
78
    cursor: u64,
79
    data: VecDeque<RedisMap>,
80
}
81
82
impl FromValue for RedisCursorData {
83
21
    fn from_value(value: RedisValue) -> Result<Self, RedisError> {
84
21
        if !value.is_array() {
  Branch (84:12): [True: 0, False: 21]
  Branch (84:12): [Folded - Ignored]
85
0
            return Err(RedisError::new(RedisErrorKind::Protocol, "Expected array"));
86
21
        }
87
21
        let mut output = Self::default();
88
21
        let value = value.into_array();
89
21
        if value.len() < 2 {
  Branch (89:12): [True: 0, False: 21]
  Branch (89:12): [Folded - Ignored]
90
0
            return Err(RedisError::new(
91
0
                RedisErrorKind::Protocol,
92
0
                "Expected at least 2 elements",
93
0
            ));
94
21
        }
95
21
        let mut value = value.into_iter();
96
21
        let data_ary = value.next().unwrap().into_array();
97
21
        if data_ary.is_empty() {
  Branch (97:12): [True: 0, False: 21]
  Branch (97:12): [Folded - Ignored]
98
0
            return Err(RedisError::new(
99
0
                RedisErrorKind::Protocol,
100
0
                "Expected at least 1 element in data array",
101
0
            ));
102
21
        }
103
21
        let Some(total) = data_ary[0].as_u64() else {
  Branch (103:13): [True: 21, False: 0]
  Branch (103:13): [Folded - Ignored]
104
0
            return Err(RedisError::new(
105
0
                RedisErrorKind::Protocol,
106
0
                "Expected integer as first element",
107
0
            ));
108
        };
109
21
        output.total = total;
110
21
        output.data.reserve(data_ary.len() - 1);
111
21
        for 
map_data7
in data_ary.into_iter().skip(1) {
112
7
            output.data.push_back(map_data.into_map()
?0
);
113
        }
114
21
        let Some(cursor) = value.next().unwrap().as_u64() else {
  Branch (114:13): [True: 21, False: 0]
  Branch (114:13): [Folded - Ignored]
115
0
            return Err(RedisError::new(
116
0
                RedisErrorKind::Protocol,
117
0
                "Expected integer as last element",
118
0
            ));
119
        };
120
21
        output.cursor = cursor;
121
21
        Ok(output)
122
21
    }
123
}