/build/source/nativelink-util/src/chunked_stream.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::collections::VecDeque; |
16 | | use std::ops::Bound; |
17 | | use std::pin::Pin; |
18 | | use std::task::{Context, Poll}; |
19 | | |
20 | | use futures::{Future, Stream}; |
21 | | use pin_project::pin_project; |
22 | | |
23 | 248 | #[pin_project(project = StreamStateProj)] |
24 | | enum StreamState<Fut> { |
25 | | Future(#[pin] Fut), |
26 | | Next, |
27 | | } |
28 | | |
29 | | /// Takes a range of keys and a function that returns a future that yields |
30 | | /// an iterator of key-value pairs. The stream will yield all key-value pairs |
31 | | /// in the range, in order and buffered. A great use case is where you need |
32 | | /// to implement Stream, but to access the underlying data requires a lock, |
33 | | /// but API does not require the data to be in sync with data already received. |
34 | 127 | #[pin_project] |
35 | | pub struct ChunkedStream<K, T, F, E, Fut> |
36 | | where |
37 | | K: Ord, |
38 | | F: FnMut(Bound<K>, Bound<K>, VecDeque<T>) -> Fut, |
39 | | Fut: Future<Output = Result<Option<((Bound<K>, Bound<K>), VecDeque<T>)>, E>>, |
40 | | { |
41 | | chunk_fn: F, |
42 | | buffer: VecDeque<T>, |
43 | | start_key: Option<Bound<K>>, |
44 | | end_key: Option<Bound<K>>, |
45 | | #[pin] |
46 | | stream_state: StreamState<Fut>, |
47 | | } |
48 | | |
49 | | impl<K, T, F, E, Fut> ChunkedStream<K, T, F, E, Fut> |
50 | | where |
51 | | K: Ord, |
52 | | F: FnMut(Bound<K>, Bound<K>, VecDeque<T>) -> Fut, |
53 | | Fut: Future<Output = Result<Option<((Bound<K>, Bound<K>), VecDeque<T>)>, E>>, |
54 | | { |
55 | 87 | pub fn new(start_key: Bound<K>, end_key: Bound<K>, chunk_fn: F) -> Self { |
56 | 87 | Self { |
57 | 87 | chunk_fn, |
58 | 87 | buffer: VecDeque::new(), |
59 | 87 | start_key: Some(start_key), |
60 | 87 | end_key: Some(end_key), |
61 | 87 | stream_state: StreamState::Next, |
62 | 87 | } |
63 | 87 | } |
64 | | } |
65 | | |
66 | | impl<K, T, F, E, Fut> Stream for ChunkedStream<K, T, F, E, Fut> |
67 | | where |
68 | | K: Ord, |
69 | | F: FnMut(Bound<K>, Bound<K>, VecDeque<T>) -> Fut, |
70 | | Fut: Future<Output = Result<Option<((Bound<K>, Bound<K>), VecDeque<T>)>, E>>, |
71 | | { |
72 | | type Item = Result<T, E>; |
73 | | |
74 | 127 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
75 | 127 | let mut this = self.project(); |
76 | | loop { |
77 | 288 | if let Some(item40 ) = this.buffer.pop_front() { Branch (77:20): [True: 0, False: 0]
Branch (77:20): [True: 0, False: 0]
Branch (77:20): [Folded - Ignored]
Branch (77:20): [Folded - Ignored]
Branch (77:20): [True: 0, False: 0]
Branch (77:20): [True: 40, False: 248]
|
78 | 40 | return Poll::Ready(Some(Ok(item))); |
79 | 248 | } |
80 | 248 | match this.stream_state.as_mut().project() { |
81 | 124 | StreamStateProj::Future(fut) => { |
82 | 124 | match futures::ready!0 (fut.poll(cx)) { |
83 | 37 | Ok(Some(((start, end), mut buffer))) => { |
84 | 37 | *this.start_key = Some(start); |
85 | 37 | *this.end_key = Some(end); |
86 | 37 | std::mem::swap(&mut buffer, this.buffer); |
87 | 37 | } |
88 | 87 | Ok(None) => return Poll::Ready(None), // End of stream. |
89 | 0 | Err(err) => return Poll::Ready(Some(Err(err))), |
90 | | } |
91 | 37 | this.stream_state.set(StreamState::Next); |
92 | | // Loop again. |
93 | | } |
94 | 124 | StreamStateProj::Next => { |
95 | 124 | this.buffer.clear(); |
96 | 124 | // This trick is used to recycle capacity. |
97 | 124 | let buffer = std::mem::take(this.buffer); |
98 | 124 | let start_key = this |
99 | 124 | .start_key |
100 | 124 | .take() |
101 | 124 | .expect("start_key should never be None"); |
102 | 124 | let end_key = this.end_key.take().expect("end_key should never be None"); |
103 | 124 | let fut = (this.chunk_fn)(start_key, end_key, buffer); |
104 | 124 | this.stream_state.set(StreamState::Future(fut)); |
105 | 124 | // Loop again. |
106 | 124 | } |
107 | | } |
108 | | } |
109 | 127 | } |
110 | | } |