Coverage Report

Created: 2025-04-19 16:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/chunked_stream.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::VecDeque;
16
use std::ops::Bound;
17
use std::pin::Pin;
18
use std::task::{Context, Poll};
19
20
use futures::{Future, Stream};
21
use pin_project::pin_project;
22
23
0
#[pin_project(project = StreamStateProj)]
24
#[derive(Debug)]
25
enum StreamState<Fut> {
26
    Future(#[pin] Fut),
27
    Next,
28
}
29
30
/// Takes a range of keys and a function that returns a future that yields
31
/// an iterator of key-value pairs. The stream will yield all key-value pairs
32
/// in the range, in order and buffered. A great use case is where you need
33
/// to implement Stream, but to access the underlying data requires a lock,
34
/// but API does not require the data to be in sync with data already received.
35
1.13k
#[pin_project]
36
#[derive(Debug)]
37
pub struct ChunkedStream<K, T, F, E, Fut>
38
where
39
    K: Ord,
40
    F: FnMut(Bound<K>, Bound<K>, VecDeque<T>) -> Fut,
41
    Fut: Future<Output = Result<Option<((Bound<K>, Bound<K>), VecDeque<T>)>, E>>,
42
{
43
    chunk_fn: F,
44
    buffer: VecDeque<T>,
45
    start_key: Option<Bound<K>>,
46
    end_key: Option<Bound<K>>,
47
    #[pin]
48
    stream_state: StreamState<Fut>,
49
}
50
51
impl<K, T, F, E, Fut> ChunkedStream<K, T, F, E, Fut>
52
where
53
    K: Ord,
54
    F: FnMut(Bound<K>, Bound<K>, VecDeque<T>) -> Fut,
55
    Fut: Future<Output = Result<Option<((Bound<K>, Bound<K>), VecDeque<T>)>, E>>,
56
{
57
590
    pub const fn new(start_key: Bound<K>, end_key: Bound<K>, chunk_fn: F) -> Self {
58
590
        Self {
59
590
            chunk_fn,
60
590
            buffer: VecDeque::new(),
61
590
            start_key: Some(start_key),
62
590
            end_key: Some(end_key),
63
590
            stream_state: StreamState::Next,
64
590
        }
65
590
    }
66
}
67
68
impl<K, T, F, E, Fut> Stream for ChunkedStream<K, T, F, E, Fut>
69
where
70
    K: Ord,
71
    F: FnMut(Bound<K>, Bound<K>, VecDeque<T>) -> Fut,
72
    Fut: Future<Output = Result<Option<((Bound<K>, Bound<K>), VecDeque<T>)>, E>>,
73
{
74
    type Item = Result<T, E>;
75
76
1.13k
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
77
1.13k
        let mut this = self.project();
78
        loop {
79
2.80k
            if let Some(
item543
) = this.buffer.pop_front() {
  Branch (79:20): [True: 0, False: 0]
  Branch (79:20): [True: 0, False: 0]
  Branch (79:20): [Folded - Ignored]
  Branch (79:20): [Folded - Ignored]
  Branch (79:20): [True: 0, False: 0]
  Branch (79:20): [True: 543, False: 2.26k]
80
543
                return Poll::Ready(Some(Ok(item)));
81
2.26k
            }
82
2.26k
            match this.stream_state.as_mut().project() {
83
1.13k
                StreamStateProj::Future(fut) => {
84
1.13k
                    match 
futures::ready!0
(fut.poll(cx)) {
85
540
                        Ok(Some(((start, end), mut buffer))) => {
86
540
                            *this.start_key = Some(start);
87
540
                            *this.end_key = Some(end);
88
540
                            std::mem::swap(&mut buffer, this.buffer);
89
540
                        }
90
590
                        Ok(None) => return Poll::Ready(None), // End of stream.
91
0
                        Err(err) => return Poll::Ready(Some(Err(err))),
92
                    }
93
540
                    this.stream_state.set(StreamState::Next);
94
                    // Loop again.
95
                }
96
1.13k
                StreamStateProj::Next => {
97
1.13k
                    this.buffer.clear();
98
1.13k
                    // This trick is used to recycle capacity.
99
1.13k
                    let buffer = std::mem::take(this.buffer);
100
1.13k
                    let start_key = this
101
1.13k
                        .start_key
102
1.13k
                        .take()
103
1.13k
                        .expect("start_key should never be None");
104
1.13k
                    let end_key = this.end_key.take().expect("end_key should never be None");
105
1.13k
                    let fut = (this.chunk_fn)(start_key, end_key, buffer);
106
1.13k
                    this.stream_state.set(StreamState::Future(fut));
107
1.13k
                    // Loop again.
108
1.13k
                }
109
            }
110
        }
111
1.13k
    }
112
}