Coverage Report

Created: 2024-11-22 20:17

/build/source/nativelink-store/src/redis_utils/ft_aggregate.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::VecDeque;
16
17
use fred::error::{RedisError, RedisErrorKind};
18
use fred::interfaces::RediSearchInterface;
19
use fred::types::{FromRedis, RedisMap, RedisValue};
20
use futures::Stream;
21
22
/// Calls FT_AGGREGATE in redis. Fred does not properly support this command
23
/// so we have to manually handle it.
24
2
pub async fn ft_aggregate<C, I, Q>(
25
2
    client: C,
26
2
    index: I,
27
2
    query: Q,
28
2
    options: fred::types::FtAggregateOptions,
29
2
) -> Result<impl Stream<Item = Result<RedisMap, RedisError>> + Send, RedisError>
30
2
where
31
2
    C: RediSearchInterface,
32
2
    I: Into<bytes_utils::string::Str>,
33
2
    Q: Into<bytes_utils::string::Str>,
34
2
{
35
    struct State<C: RediSearchInterface> {
36
        client: C,
37
        index: bytes_utils::string::Str,
38
        data: RedisCursorData,
39
    }
40
41
2
    let index = index.into();
42
2
    let query = query.into();
43
2
    let 
data: RedisCursorData1
= client.ft_aggregate(index.clone(), query, options).await
?1
;
44
45
1
    let state = State {
46
1
        client,
47
1
        index,
48
1
        data,
49
1
    };
50
1
    Ok(futures::stream::unfold(
51
1
        Some(state),
52
1
        move |maybe_state| async move {
53
1
            let mut state = maybe_state
?0
;
54
            loop {
55
1
                if let Some(
map0
) = state.data.data.pop_front() {
  Branch (55:24): [True: 0, False: 0]
  Branch (55:24): [Folded - Ignored]
  Branch (55:24): [Folded - Ignored]
  Branch (55:24): [True: 0, False: 1]
56
0
                    return Some((Ok(map), Some(state)));
57
1
                }
58
1
                if state.data.cursor == 0 {
  Branch (58:20): [True: 0, False: 0]
  Branch (58:20): [Folded - Ignored]
  Branch (58:20): [Folded - Ignored]
  Branch (58:20): [True: 1, False: 0]
59
1
                    return None;
60
0
                }
61
0
                let data_res = state
62
0
                    .client
63
0
                    .ft_cursor_read(state.index.clone(), state.data.cursor, None)
64
0
                    .await;
65
0
                state.data = match data_res {
66
0
                    Ok(data) => data,
67
0
                    Err(err) => return Some((Err(err), None)),
68
                };
69
0
                continue;
70
            }
71
2
        },
72
1
    ))
73
2
}
74
75
#[derive(Debug, Default)]
76
struct RedisCursorData {
77
    total: u64,
78
    cursor: u64,
79
    data: VecDeque<RedisMap>,
80
}
81
82
impl FromRedis for RedisCursorData {
83
1
    fn from_value(value: RedisValue) -> Result<Self, RedisError> {
84
1
        if !value.is_array() {
  Branch (84:12): [True: 0, False: 1]
  Branch (84:12): [Folded - Ignored]
85
0
            return Err(RedisError::new(RedisErrorKind::Protocol, "Expected array"));
86
1
        }
87
1
        let mut output = Self::default();
88
1
        let value = value.into_array();
89
1
        if value.len() < 2 {
  Branch (89:12): [True: 0, False: 1]
  Branch (89:12): [Folded - Ignored]
90
0
            return Err(RedisError::new(
91
0
                RedisErrorKind::Protocol,
92
0
                "Expected at least 2 elements",
93
0
            ));
94
1
        }
95
1
        let mut value = value.into_iter();
96
1
        let data_ary = value.next().unwrap().into_array();
97
1
        if data_ary.is_empty() {
  Branch (97:12): [True: 0, False: 1]
  Branch (97:12): [Folded - Ignored]
98
0
            return Err(RedisError::new(
99
0
                RedisErrorKind::Protocol,
100
0
                "Expected at least 1 element in data array",
101
0
            ));
102
1
        }
103
1
        let Some(total) = data_ary[0].as_u64() else {
  Branch (103:13): [True: 1, False: 0]
  Branch (103:13): [Folded - Ignored]
104
0
            return Err(RedisError::new(
105
0
                RedisErrorKind::Protocol,
106
0
                "Expected integer as first element",
107
0
            ));
108
        };
109
1
        output.total = total;
110
1
        output.data.reserve(data_ary.len() - 1);
111
1
        for 
map_data0
in data_ary.into_iter().skip(1) {
112
0
            output.data.push_back(map_data.into_map()?);
113
        }
114
1
        let Some(cursor) = value.next().unwrap().as_u64() else {
  Branch (114:13): [True: 1, False: 0]
  Branch (114:13): [Folded - Ignored]
115
0
            return Err(RedisError::new(
116
0
                RedisErrorKind::Protocol,
117
0
                "Expected integer as last element",
118
0
            ));
119
        };
120
1
        output.cursor = cursor;
121
1
        Ok(output)
122
1
    }
123
}