/build/source/nativelink-util/src/chunked_stream.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::collections::VecDeque; |
16 | | use std::ops::Bound; |
17 | | use std::pin::Pin; |
18 | | use std::task::{Context, Poll}; |
19 | | |
20 | | use futures::{Future, Stream}; |
21 | | use pin_project::pin_project; |
22 | | |
23 | 0 | #[pin_project(project = StreamStateProj)] |
24 | | #[derive(Debug)] |
25 | | enum StreamState<Fut> { |
26 | | Future(#[pin] Fut), |
27 | | Next, |
28 | | } |
29 | | |
30 | | /// Takes a range of keys and a function that returns a future that yields |
31 | | /// an iterator of key-value pairs. The stream will yield all key-value pairs |
32 | | /// in the range, in order and buffered. A great use case is where you need |
33 | | /// to implement Stream, but to access the underlying data requires a lock, |
34 | | /// but API does not require the data to be in sync with data already received. |
35 | 1.13k | #[pin_project] |
36 | | #[derive(Debug)] |
37 | | pub struct ChunkedStream<K, T, F, E, Fut> |
38 | | where |
39 | | K: Ord, |
40 | | F: FnMut(Bound<K>, Bound<K>, VecDeque<T>) -> Fut, |
41 | | Fut: Future<Output = Result<Option<((Bound<K>, Bound<K>), VecDeque<T>)>, E>>, |
42 | | { |
43 | | chunk_fn: F, |
44 | | buffer: VecDeque<T>, |
45 | | start_key: Option<Bound<K>>, |
46 | | end_key: Option<Bound<K>>, |
47 | | #[pin] |
48 | | stream_state: StreamState<Fut>, |
49 | | } |
50 | | |
51 | | impl<K, T, F, E, Fut> ChunkedStream<K, T, F, E, Fut> |
52 | | where |
53 | | K: Ord, |
54 | | F: FnMut(Bound<K>, Bound<K>, VecDeque<T>) -> Fut, |
55 | | Fut: Future<Output = Result<Option<((Bound<K>, Bound<K>), VecDeque<T>)>, E>>, |
56 | | { |
57 | 590 | pub const fn new(start_key: Bound<K>, end_key: Bound<K>, chunk_fn: F) -> Self { |
58 | 590 | Self { |
59 | 590 | chunk_fn, |
60 | 590 | buffer: VecDeque::new(), |
61 | 590 | start_key: Some(start_key), |
62 | 590 | end_key: Some(end_key), |
63 | 590 | stream_state: StreamState::Next, |
64 | 590 | } |
65 | 590 | } |
66 | | } |
67 | | |
68 | | impl<K, T, F, E, Fut> Stream for ChunkedStream<K, T, F, E, Fut> |
69 | | where |
70 | | K: Ord, |
71 | | F: FnMut(Bound<K>, Bound<K>, VecDeque<T>) -> Fut, |
72 | | Fut: Future<Output = Result<Option<((Bound<K>, Bound<K>), VecDeque<T>)>, E>>, |
73 | | { |
74 | | type Item = Result<T, E>; |
75 | | |
76 | 1.13k | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
77 | 1.13k | let mut this = self.project(); |
78 | | loop { |
79 | 2.80k | if let Some(item543 ) = this.buffer.pop_front() { Branch (79:20): [True: 0, False: 0]
Branch (79:20): [True: 0, False: 0]
Branch (79:20): [Folded - Ignored]
Branch (79:20): [Folded - Ignored]
Branch (79:20): [True: 0, False: 0]
Branch (79:20): [True: 543, False: 2.26k]
|
80 | 543 | return Poll::Ready(Some(Ok(item))); |
81 | 2.26k | } |
82 | 2.26k | match this.stream_state.as_mut().project() { |
83 | 1.13k | StreamStateProj::Future(fut) => { |
84 | 1.13k | match futures::ready!0 (fut.poll(cx)) { |
85 | 540 | Ok(Some(((start, end), mut buffer))) => { |
86 | 540 | *this.start_key = Some(start); |
87 | 540 | *this.end_key = Some(end); |
88 | 540 | std::mem::swap(&mut buffer, this.buffer); |
89 | 540 | } |
90 | 590 | Ok(None) => return Poll::Ready(None), // End of stream. |
91 | 0 | Err(err) => return Poll::Ready(Some(Err(err))), |
92 | | } |
93 | 540 | this.stream_state.set(StreamState::Next); |
94 | | // Loop again. |
95 | | } |
96 | 1.13k | StreamStateProj::Next => { |
97 | 1.13k | this.buffer.clear(); |
98 | 1.13k | // This trick is used to recycle capacity. |
99 | 1.13k | let buffer = std::mem::take(this.buffer); |
100 | 1.13k | let start_key = this |
101 | 1.13k | .start_key |
102 | 1.13k | .take() |
103 | 1.13k | .expect("start_key should never be None"); |
104 | 1.13k | let end_key = this.end_key.take().expect("end_key should never be None"); |
105 | 1.13k | let fut = (this.chunk_fn)(start_key, end_key, buffer); |
106 | 1.13k | this.stream_state.set(StreamState::Future(fut)); |
107 | 1.13k | // Loop again. |
108 | 1.13k | } |
109 | | } |
110 | | } |
111 | 1.13k | } |
112 | | } |