/build/source/nativelink-store/src/redis_utils/ft_aggregate.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::collections::VecDeque; |
16 | | |
17 | | use fred::error::{RedisError, RedisErrorKind}; |
18 | | use fred::interfaces::RediSearchInterface; |
19 | | use fred::types::{FromRedis, RedisMap, RedisValue}; |
20 | | use futures::Stream; |
21 | | |
22 | | /// Calls FT_AGGREGATE in redis. Fred does not properly support this command |
23 | | /// so we have to manually handle it. |
24 | 2 | pub async fn ft_aggregate<C, I, Q>( |
25 | 2 | client: C, |
26 | 2 | index: I, |
27 | 2 | query: Q, |
28 | 2 | options: fred::types::FtAggregateOptions, |
29 | 2 | ) -> Result<impl Stream<Item = Result<RedisMap, RedisError>> + Send, RedisError> |
30 | 2 | where |
31 | 2 | C: RediSearchInterface, |
32 | 2 | I: Into<bytes_utils::string::Str>, |
33 | 2 | Q: Into<bytes_utils::string::Str>, |
34 | 2 | { |
35 | | struct State<C: RediSearchInterface> { |
36 | | client: C, |
37 | | index: bytes_utils::string::Str, |
38 | | data: RedisCursorData, |
39 | | } |
40 | | |
41 | 2 | let index = index.into(); |
42 | 2 | let query = query.into(); |
43 | 2 | let data: RedisCursorData1 = client.ft_aggregate(index.clone(), query, options).await?1 ; |
44 | | |
45 | 1 | let state = State { |
46 | 1 | client, |
47 | 1 | index, |
48 | 1 | data, |
49 | 1 | }; |
50 | 1 | Ok(futures::stream::unfold( |
51 | 1 | Some(state), |
52 | 1 | move |maybe_state| async move { |
53 | 1 | let mut state = maybe_state?0 ; |
54 | | loop { |
55 | 1 | if let Some(map0 ) = state.data.data.pop_front() { Branch (55:24): [True: 0, False: 0]
Branch (55:24): [Folded - Ignored]
Branch (55:24): [Folded - Ignored]
Branch (55:24): [True: 0, False: 1]
|
56 | 0 | return Some((Ok(map), Some(state))); |
57 | 1 | } |
58 | 1 | if state.data.cursor == 0 { Branch (58:20): [True: 0, False: 0]
Branch (58:20): [Folded - Ignored]
Branch (58:20): [Folded - Ignored]
Branch (58:20): [True: 1, False: 0]
|
59 | 1 | return None; |
60 | 0 | } |
61 | 0 | let data_res = state |
62 | 0 | .client |
63 | 0 | .ft_cursor_read(state.index.clone(), state.data.cursor, None) |
64 | 0 | .await; |
65 | 0 | state.data = match data_res { |
66 | 0 | Ok(data) => data, |
67 | 0 | Err(err) => return Some((Err(err), None)), |
68 | | }; |
69 | 0 | continue; |
70 | | } |
71 | 2 | }, |
72 | 1 | )) |
73 | 2 | } |
74 | | |
75 | | #[derive(Debug, Default)] |
76 | | struct RedisCursorData { |
77 | | total: u64, |
78 | | cursor: u64, |
79 | | data: VecDeque<RedisMap>, |
80 | | } |
81 | | |
82 | | impl FromRedis for RedisCursorData { |
83 | 1 | fn from_value(value: RedisValue) -> Result<Self, RedisError> { |
84 | 1 | if !value.is_array() { Branch (84:12): [True: 0, False: 1]
Branch (84:12): [Folded - Ignored]
|
85 | 0 | return Err(RedisError::new(RedisErrorKind::Protocol, "Expected array")); |
86 | 1 | } |
87 | 1 | let mut output = Self::default(); |
88 | 1 | let value = value.into_array(); |
89 | 1 | if value.len() < 2 { Branch (89:12): [True: 0, False: 1]
Branch (89:12): [Folded - Ignored]
|
90 | 0 | return Err(RedisError::new( |
91 | 0 | RedisErrorKind::Protocol, |
92 | 0 | "Expected at least 2 elements", |
93 | 0 | )); |
94 | 1 | } |
95 | 1 | let mut value = value.into_iter(); |
96 | 1 | let data_ary = value.next().unwrap().into_array(); |
97 | 1 | if data_ary.is_empty() { Branch (97:12): [True: 0, False: 1]
Branch (97:12): [Folded - Ignored]
|
98 | 0 | return Err(RedisError::new( |
99 | 0 | RedisErrorKind::Protocol, |
100 | 0 | "Expected at least 1 element in data array", |
101 | 0 | )); |
102 | 1 | } |
103 | 1 | let Some(total) = data_ary[0].as_u64() else { Branch (103:13): [True: 1, False: 0]
Branch (103:13): [Folded - Ignored]
|
104 | 0 | return Err(RedisError::new( |
105 | 0 | RedisErrorKind::Protocol, |
106 | 0 | "Expected integer as first element", |
107 | 0 | )); |
108 | | }; |
109 | 1 | output.total = total; |
110 | 1 | output.data.reserve(data_ary.len() - 1); |
111 | 1 | for map_data0 in data_ary.into_iter().skip(1) { |
112 | 0 | output.data.push_back(map_data.into_map()?); |
113 | | } |
114 | 1 | let Some(cursor) = value.next().unwrap().as_u64() else { Branch (114:13): [True: 1, False: 0]
Branch (114:13): [Folded - Ignored]
|
115 | 0 | return Err(RedisError::new( |
116 | 0 | RedisErrorKind::Protocol, |
117 | 0 | "Expected integer as last element", |
118 | 0 | )); |
119 | | }; |
120 | 1 | output.cursor = cursor; |
121 | 1 | Ok(output) |
122 | 1 | } |
123 | | } |