Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-store/src/redis_utils/ft_aggregate.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::VecDeque;
16
17
use fred::error::{RedisError, RedisErrorKind};
18
use fred::interfaces::RediSearchInterface;
19
use fred::types::{FromRedis, RedisMap, RedisValue};
20
use futures::Stream;
21
22
/// Calls FT_AGGREGATE in redis. Fred does not properly support this command
23
/// so we have to manually handle it.
24
2
pub async fn ft_aggregate<C, I, Q>(
25
2
    client: C,
26
2
    index: I,
27
2
    query: Q,
28
2
    options: fred::types::FtAggregateOptions,
29
2
) -> Result<impl Stream<Item = Result<RedisMap, RedisError>> + Send, RedisError>
30
2
where
31
2
    C: RediSearchInterface,
32
2
    I: Into<bytes_utils::string::Str>,
33
2
    Q: Into<bytes_utils::string::Str>,
34
2
{
35
2
    let index = index.into();
36
2
    let query = query.into();
37
2
    let 
data: RedisCursorData1
= client.ft_aggregate(index.clone(), query, options).await
?1
;
38
39
    struct State<C: RediSearchInterface> {
40
        client: C,
41
        index: bytes_utils::string::Str,
42
        data: RedisCursorData,
43
    }
44
45
1
    let state = State {
46
1
        client,
47
1
        index,
48
1
        data,
49
1
    };
50
1
    Ok(futures::stream::unfold(
51
1
        Some(state),
52
1
        move |maybe_state| async move {
53
1
            let mut state = maybe_state
?0
;
54
            loop {
55
1
                if let Some(
map0
) = state.data.data.pop_front() {
  Branch (55:24): [True: 0, False: 0]
  Branch (55:24): [Folded - Ignored]
  Branch (55:24): [Folded - Ignored]
  Branch (55:24): [True: 0, False: 1]
56
0
                    return Some((Ok(map), Some(state)));
57
                } else {
58
1
                    if state.data.cursor == 0 {
  Branch (58:24): [True: 0, False: 0]
  Branch (58:24): [Folded - Ignored]
  Branch (58:24): [Folded - Ignored]
  Branch (58:24): [True: 1, False: 0]
59
1
                        return None;
60
0
                    }
61
0
                    let data_res = state
62
0
                        .client
63
0
                        .ft_cursor_read(state.index.clone(), state.data.cursor, None)
64
0
                        .await;
65
0
                    state.data = match data_res {
66
0
                        Ok(data) => data,
67
0
                        Err(err) => return Some((Err(err), None)),
68
                    };
69
0
                    continue;
70
                }
71
            }
72
2
        },
73
1
    ))
74
2
}
75
76
#[derive(Debug, Default)]
77
struct RedisCursorData {
78
    total: u64,
79
    cursor: u64,
80
    data: VecDeque<RedisMap>,
81
}
82
83
impl FromRedis for RedisCursorData {
84
1
    fn from_value(value: RedisValue) -> Result<Self, RedisError> {
85
1
        if !value.is_array() {
  Branch (85:12): [True: 0, False: 1]
  Branch (85:12): [Folded - Ignored]
86
0
            return Err(RedisError::new(RedisErrorKind::Protocol, "Expected array"));
87
1
        }
88
1
        let mut output = Self::default();
89
1
        let value = value.into_array();
90
1
        if value.len() < 2 {
  Branch (90:12): [True: 0, False: 1]
  Branch (90:12): [Folded - Ignored]
91
0
            return Err(RedisError::new(
92
0
                RedisErrorKind::Protocol,
93
0
                "Expected at least 2 elements",
94
0
            ));
95
1
        }
96
1
        let mut value = value.into_iter();
97
1
        let data_ary = value.next().unwrap().into_array();
98
1
        if data_ary.is_empty() {
  Branch (98:12): [True: 0, False: 1]
  Branch (98:12): [Folded - Ignored]
99
0
            return Err(RedisError::new(
100
0
                RedisErrorKind::Protocol,
101
0
                "Expected at least 1 element in data array",
102
0
            ));
103
1
        }
104
1
        let Some(total) = data_ary[0].as_u64() else {
  Branch (104:13): [True: 1, False: 0]
  Branch (104:13): [Folded - Ignored]
105
0
            return Err(RedisError::new(
106
0
                RedisErrorKind::Protocol,
107
0
                "Expected integer as first element",
108
0
            ));
109
        };
110
1
        output.total = total;
111
1
        output.data.reserve(data_ary.len() - 1);
112
1
        for 
map_data0
in data_ary.into_iter().skip(1) {
113
0
            output.data.push_back(map_data.into_map()?);
114
        }
115
1
        let Some(cursor) = value.next().unwrap().as_u64() else {
  Branch (115:13): [True: 1, False: 0]
  Branch (115:13): [Folded - Ignored]
116
0
            return Err(RedisError::new(
117
0
                RedisErrorKind::Protocol,
118
0
                "Expected integer as last element",
119
0
            ));
120
        };
121
1
        output.cursor = cursor;
122
1
        Ok(output)
123
1
    }
124
}