Coverage Report

Created: 2025-05-30 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/buf_channel.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use core::sync::atomic::{AtomicBool, Ordering};
17
use core::task::Poll;
18
use std::collections::VecDeque;
19
use std::sync::Arc;
20
21
use bytes::{Bytes, BytesMut};
22
use futures::task::Context;
23
use futures::{Future, Stream, TryFutureExt};
24
use nativelink_error::{Code, Error, ResultExt, error_if, make_err, make_input_err};
25
use tokio::sync::mpsc;
26
use tracing::warn;
27
28
const ZERO_DATA: Bytes = Bytes::new();
29
30
/// Create a channel pair that can be used to transport buffer objects around to
31
/// different components. This wrapper is used because the streams give some
32
/// utility like managing EOF in a more friendly way, ensure if no EOF is received
33
/// it will send an error to the receiver channel before shutting down and count
34
/// the number of bytes sent.
35
#[must_use]
36
13.6k
pub fn make_buf_channel_pair() -> (DropCloserWriteHalf, DropCloserReadHalf) {
37
    // We allow up to 2 items in the buffer at any given time. There is no major
38
    // reason behind this magic number other than thinking it will be nice to give
39
    // a little time for another thread to wake up and consume data if another
40
    // thread is pumping large amounts of data into the channel.
41
13.6k
    let (tx, rx) = mpsc::channel(2);
42
13.6k
    let eof_sent = Arc::new(AtomicBool::new(false));
43
13.6k
    (
44
13.6k
        DropCloserWriteHalf {
45
13.6k
            tx: Some(tx),
46
13.6k
            bytes_written: 0,
47
13.6k
            eof_sent: eof_sent.clone(),
48
13.6k
        },
49
13.6k
        DropCloserReadHalf {
50
13.6k
            rx,
51
13.6k
            queued_data: VecDeque::new(),
52
13.6k
            last_err: None,
53
13.6k
            eof_sent,
54
13.6k
            bytes_received: 0,
55
13.6k
            recent_data: Vec::new(),
56
13.6k
            max_recent_data_size: 0,
57
13.6k
        },
58
13.6k
    )
59
13.6k
}
60
61
/// Writer half of the pair.
62
#[derive(Debug)]
63
pub struct DropCloserWriteHalf {
64
    tx: Option<mpsc::Sender<Bytes>>,
65
    bytes_written: u64,
66
    eof_sent: Arc<AtomicBool>,
67
}
68
69
impl DropCloserWriteHalf {
70
    /// Sends data over the channel to the receiver.
71
133k
    pub fn send(&mut self, buf: Bytes) -> impl Future<Output = Result<(), Error>> + '_ {
72
133k
        self.send_get_bytes_on_error(buf).map_err(|err| err.0)
73
133k
    }
74
75
    /// Sends data over the channel to the receiver.
76
    #[inline]
77
133k
    async fn send_get_bytes_on_error(&mut self, buf: Bytes) -> Result<(), (Error, Bytes)> {
78
133k
        let tx = match self
79
133k
            .tx
80
133k
            .as_ref()
81
133k
            .ok_or_else(|| make_err!(
Code::Internal0
, "Tried to send while stream is closed"))
82
        {
83
133k
            Ok(tx) => tx,
84
0
            Err(e) => return Err((e, buf)),
85
        };
86
133k
        let Ok(buf_len) = u64::try_from(buf.len()) else {
  Branch (86:13): [True: 13, False: 0]
  Branch (86:13): [True: 8.60k, False: 0]
  Branch (86:13): [Folded - Ignored]
  Branch (86:13): [True: 64, False: 0]
  Branch (86:13): [True: 2.02k, False: 0]
  Branch (86:13): [True: 1, False: 0]
  Branch (86:13): [True: 24, False: 0]
  Branch (86:13): [True: 5, False: 0]
  Branch (86:13): [True: 97, False: 0]
  Branch (86:13): [True: 4, False: 0]
  Branch (86:13): [True: 6, False: 0]
  Branch (86:13): [True: 1.06k, False: 0]
  Branch (86:13): [True: 102, False: 0]
  Branch (86:13): [True: 115k, False: 0]
  Branch (86:13): [True: 0, False: 0]
  Branch (86:13): [True: 0, False: 0]
  Branch (86:13): [True: 406, False: 0]
  Branch (86:13): [Folded - Ignored]
  Branch (86:13): [True: 5, False: 0]
  Branch (86:13): [True: 3, False: 0]
  Branch (86:13): [True: 98, False: 0]
  Branch (86:13): [True: 104, False: 0]
  Branch (86:13): [True: 5.01k, False: 0]
  Branch (86:13): [True: 6, False: 0]
  Branch (86:13): [True: 12, False: 0]
87
0
            return Err((
88
0
                make_err!(Code::Internal, "Could not convert usize to u64"),
89
0
                buf,
90
0
            ));
91
        };
92
133k
        if buf_len == 0 {
  Branch (92:12): [True: 0, False: 13]
  Branch (92:12): [True: 0, False: 8.60k]
  Branch (92:12): [Folded - Ignored]
  Branch (92:12): [True: 0, False: 64]
  Branch (92:12): [True: 0, False: 2.02k]
  Branch (92:12): [True: 0, False: 1]
  Branch (92:12): [True: 0, False: 24]
  Branch (92:12): [True: 0, False: 5]
  Branch (92:12): [True: 0, False: 97]
  Branch (92:12): [True: 0, False: 4]
  Branch (92:12): [True: 0, False: 6]
  Branch (92:12): [True: 0, False: 1.06k]
  Branch (92:12): [True: 0, False: 102]
  Branch (92:12): [True: 0, False: 115k]
  Branch (92:12): [True: 0, False: 0]
  Branch (92:12): [True: 0, False: 0]
  Branch (92:12): [True: 0, False: 406]
  Branch (92:12): [Folded - Ignored]
  Branch (92:12): [True: 0, False: 5]
  Branch (92:12): [True: 0, False: 3]
  Branch (92:12): [True: 0, False: 98]
  Branch (92:12): [True: 0, False: 104]
  Branch (92:12): [True: 0, False: 5.01k]
  Branch (92:12): [True: 0, False: 6]
  Branch (92:12): [True: 0, False: 12]
93
0
            return Err((
94
0
                make_input_err!("Cannot send EOF in send(). Instead use send_eof()"),
95
0
                buf,
96
0
            ));
97
133k
        }
98
133k
        if let Err(
err0
) = tx.send(buf).await {
  Branch (98:16): [True: 0, False: 13]
  Branch (98:16): [True: 0, False: 8.60k]
  Branch (98:16): [Folded - Ignored]
  Branch (98:16): [True: 0, False: 64]
  Branch (98:16): [True: 0, False: 2.02k]
  Branch (98:16): [True: 0, False: 1]
  Branch (98:16): [True: 0, False: 24]
  Branch (98:16): [True: 0, False: 5]
  Branch (98:16): [True: 0, False: 97]
  Branch (98:16): [True: 0, False: 4]
  Branch (98:16): [True: 0, False: 6]
  Branch (98:16): [True: 0, False: 1.06k]
  Branch (98:16): [True: 0, False: 102]
  Branch (98:16): [True: 0, False: 115k]
  Branch (98:16): [True: 0, False: 0]
  Branch (98:16): [True: 0, False: 0]
  Branch (98:16): [True: 0, False: 406]
  Branch (98:16): [Folded - Ignored]
  Branch (98:16): [True: 0, False: 5]
  Branch (98:16): [True: 0, False: 3]
  Branch (98:16): [True: 0, False: 98]
  Branch (98:16): [True: 0, False: 104]
  Branch (98:16): [True: 0, False: 5.01k]
  Branch (98:16): [True: 0, False: 6]
  Branch (98:16): [True: 0, False: 12]
99
            // Close our channel.
100
0
            self.tx = None;
101
0
            return Err((
102
0
                make_err!(
103
0
                    Code::Internal,
104
0
                    "Failed to write to data, receiver disconnected"
105
0
                ),
106
0
                err.0,
107
0
            ));
108
133k
        }
109
133k
        self.bytes_written += buf_len;
110
133k
        Ok(())
111
133k
    }
112
113
    /// Binds a reader and a writer together. This will send all the data from the reader
114
    /// to the writer until an EOF is received.
115
    /// This will always read one message ahead to ensure that if an error happens
116
    /// on the EOF message it will not forward on the last payload message and instead
117
    /// forward on the error.
118
2
    pub async fn bind_buffered(&mut self, reader: &mut DropCloserReadHalf) -> Result<(), Error> {
119
        loop {
120
52
            let chunk = reader
121
52
                .recv()
122
52
                .await
123
52
                .err_tip(|| "In DropCloserWriteHalf::bind_buffered::recv")
?0
;
124
52
            if chunk.is_empty() {
  Branch (124:16): [True: 0, False: 0]
  Branch (124:16): [Folded - Ignored]
  Branch (124:16): [True: 0, False: 1]
  Branch (124:16): [Folded - Ignored]
  Branch (124:16): [True: 1, False: 50]
125
1
                self.send_eof()
126
1
                    .err_tip(|| "In DropCloserWriteHalf::bind_buffered::send_eof")
?0
;
127
1
                break; // EOF.
128
51
            }
129
            // Always read one message ahead so if we get an error on our EOF
130
            // we forward it on to the reader.
131
51
            if reader.peek().await.is_err() {
  Branch (131:16): [True: 0, False: 0]
  Branch (131:16): [Folded - Ignored]
  Branch (131:16): [True: 1, False: 0]
  Branch (131:16): [Folded - Ignored]
  Branch (131:16): [True: 0, False: 50]
132
                // Read our next message for good book keeping.
133
0
                drop(
134
1
                    reader
135
1
                        .recv()
136
1
                        .await
137
1
                        .err_tip(|| "In DropCloserWriteHalf::bind_buffered::peek::eof")?,
138
                );
139
0
                return Err(make_err!(
140
0
                    Code::Internal,
141
0
                    "DropCloserReadHalf::peek() said error, but when data received said Ok. This should never happen."
142
0
                ));
143
50
            }
144
50
            match self.send_get_bytes_on_error(chunk).await {
145
50
                Ok(()) => {}
146
0
                Err(e) => {
147
0
                    reader.queued_data.push_front(e.1);
148
0
                    return Err(e.0).err_tip(|| "In DropCloserWriteHalf::bind_buffered::send");
149
                }
150
            }
151
        }
152
1
        Ok(())
153
2
    }
154
155
    /// Sends an EOF (End of File) message to the receiver which will gracefully let the
156
    /// stream know it has no more data. This will close the stream.
157
13.5k
    pub fn send_eof(&mut self) -> Result<(), Error> {
158
        // Flag that we have sent the EOF.
159
13.5k
        let eof_was_sent = self.eof_sent.swap(true, Ordering::Release);
160
13.5k
        if eof_was_sent {
  Branch (160:12): [True: 1, False: 13.5k]
  Branch (160:12): [Folded - Ignored]
161
1
            warn!(
162
1
                "Stream already closed when eof already was sent. This is often ok for retry was triggered, but should not happen on happy path."
163
            );
164
1
            return Ok(());
165
13.5k
        }
166
167
        // Now close our stream.
168
13.5k
        self.tx = None;
169
13.5k
        Ok(())
170
13.5k
    }
171
172
    /// Returns the number of bytes written so far. This does not mean the receiver received
173
    /// all of the bytes written to the stream so far.
174
    #[must_use]
175
141
    pub const fn get_bytes_written(&self) -> u64 {
176
141
        self.bytes_written
177
141
    }
178
179
    /// Returns if the pipe was broken. This is good for determining if the reader broke the
180
    /// pipe or the writer broke the pipe, since this will only return true if the pipe was
181
    /// broken by the writer.
182
    #[must_use]
183
2
    pub const fn is_pipe_broken(&self) -> bool {
184
2
        self.tx.is_none()
185
2
    }
186
}
187
188
/// Reader half of the pair.
189
#[derive(Debug)]
190
pub struct DropCloserReadHalf {
191
    rx: mpsc::Receiver<Bytes>,
192
    /// Number of bytes received over the stream.
193
    bytes_received: u64,
194
    eof_sent: Arc<AtomicBool>,
195
    /// If there was an error in the stream, this will be set to the last error.
196
    last_err: Option<Error>,
197
    /// If not empty, this is the data that needs to be sent out before
198
    /// data from the underlying channel can should be sent.
199
    queued_data: VecDeque<Bytes>,
200
    /// As data is being read from the stream, this buffer will be filled
201
    /// with the most recent data. Once `max_recent_data_size` is reached
202
    /// this buffer will be cleared and no longer be populated.
203
    /// This is useful if the caller wants to reset the the reader to before
204
    /// any of the data was received if possible (eg: something failed and
205
    /// we want to retry).
206
    recent_data: Vec<Bytes>,
207
    /// Amount of data to keep in the `recent_data` buffer before clearing it
208
    /// and no longer populating it.
209
    max_recent_data_size: u64,
210
}
211
212
impl DropCloserReadHalf {
213
    /// Returns if the stream has data ready.
214
0
    pub fn is_empty(&self) -> bool {
215
0
        self.rx.is_empty()
216
0
    }
217
218
146k
    fn recv_inner(&mut self, chunk: Bytes) -> Result<Bytes, Error> {
219
        // `queued_data` is allowed to have empty bytes that represent EOF
220
146k
        if chunk.is_empty() {
  Branch (220:12): [True: 13.1k, False: 133k]
  Branch (220:12): [Folded - Ignored]
221
13.1k
            if !self.eof_sent.load(Ordering::Acquire) {
  Branch (221:16): [True: 21, False: 13.0k]
  Branch (221:16): [Folded - Ignored]
222
21
                let err = make_err!(Code::Internal, "Sender dropped before sending EOF");
223
21
                self.queued_data.clear();
224
21
                self.recent_data.clear();
225
21
                self.bytes_received = 0;
226
21
                self.last_err = Some(err.clone());
227
21
                return Err(err);
228
13.0k
            }
229
230
13.0k
            self.maybe_populate_recent_data(&ZERO_DATA);
231
13.0k
            return Ok(ZERO_DATA);
232
133k
        }
233
234
133k
        self.bytes_received += chunk.len() as u64;
235
133k
        self.maybe_populate_recent_data(&chunk);
236
133k
        Ok(chunk)
237
146k
    }
238
239
    /// Try to receive a chunk of data, returning `None` if none is available.
240
169k
    pub fn try_recv(&mut self) -> Option<Result<Bytes, Error>> {
241
169k
        if let Some(
err2
) = &self.last_err {
  Branch (241:16): [True: 2, False: 169k]
  Branch (241:16): [Folded - Ignored]
242
2
            return Some(Err(err.clone()));
243
169k
        }
244
169k
        self.queued_data.pop_front().map(Ok)
245
169k
    }
246
247
    /// Receive a chunk of data, waiting asynchronously until some is available.
248
169k
    pub async fn recv(&mut self) -> Result<Bytes, Error> {
249
169k
        if let Some(
result23.2k
) = self.try_recv() {
  Branch (249:16): [True: 9.76k, False: 5]
  Branch (249:16): [True: 10.0k, False: 14.4k]
  Branch (249:16): [True: 0, False: 112]
  Branch (249:16): [True: 0, False: 2]
  Branch (249:16): [True: 0, False: 7]
  Branch (249:16): [True: 2.10k, False: 3.20k]
  Branch (249:16): [True: 0, False: 0]
  Branch (249:16): [True: 0, False: 35]
  Branch (249:16): [True: 571, False: 2.79k]
  Branch (249:16): [True: 639, False: 8.73k]
  Branch (249:16): [True: 0, False: 2]
  Branch (249:16): [True: 0, False: 24]
  Branch (249:16): [True: 5, False: 1.09k]
  Branch (249:16): [True: 7, False: 115k]
  Branch (249:16): [True: 0, False: 0]
  Branch (249:16): [True: 0, False: 42]
  Branch (249:16): [True: 0, False: 7]
  Branch (249:16): [Folded - Ignored]
  Branch (249:16): [True: 1, False: 9]
  Branch (249:16): [True: 0, False: 4]
  Branch (249:16): [True: 0, False: 233]
  Branch (249:16): [True: 54, False: 60]
  Branch (249:16): [True: 0, False: 10]
  Branch (249:16): [True: 0, False: 8]
  Branch (249:16): [True: 0, False: 0]
250
23.2k
            result
251
        } else {
252
            // `None` here indicates EOF, which we represent as Zero data
253
146k
            let 
data146k
= self.rx.recv().await.
unwrap_or146k
(
ZERO_DATA146k
);
254
146k
            self.recv_inner(data)
255
        }
256
169k
    }
257
258
146k
    fn maybe_populate_recent_data(&mut self, chunk: &Bytes) {
259
146k
        if self.max_recent_data_size == 0 {
  Branch (259:12): [True: 30.6k, False: 115k]
  Branch (259:12): [Folded - Ignored]
260
30.6k
            return; // Fast path.
261
115k
        }
262
115k
        if self.bytes_received > self.max_recent_data_size {
  Branch (262:12): [True: 62.9k, False: 52.5k]
  Branch (262:12): [Folded - Ignored]
263
62.9k
            if !self.recent_data.is_empty() {
  Branch (263:16): [True: 1, False: 62.9k]
  Branch (263:16): [Folded - Ignored]
264
1
                self.recent_data.clear();
265
62.9k
            }
266
62.9k
            return;
267
52.5k
        }
268
52.5k
        self.recent_data.push(chunk.clone());
269
146k
    }
270
271
    /// Sets the maximum size of the `recent_data` buffer. If the number of bytes
272
    /// received exceeds this size, the `recent_data` buffer will be cleared and
273
    /// no longer populated.
274
4
    pub const fn set_max_recent_data_size(&mut self, size: u64) {
275
4
        self.max_recent_data_size = size;
276
4
    }
277
278
    /// Attempts to reset the stream to before any data was received. This will
279
    /// only work if the number of bytes received is less than `max_recent_data_size`.
280
    ///
281
    /// On error the state of the stream is undefined and the caller should not
282
    /// attempt to use the stream again.
283
1
    pub fn try_reset_stream(&mut self) -> Result<(), Error> {
284
1
        if self.bytes_received > self.max_recent_data_size {
  Branch (284:12): [True: 0, False: 1]
  Branch (284:12): [Folded - Ignored]
285
0
            return Err(make_err!(
286
0
                Code::Internal,
287
0
                "Cannot reset stream, max_recent_data_size exceeded"
288
0
            ));
289
1
        }
290
1
        let mut data_sum = 0;
291
1
        for 
chunk0
in self.recent_data.drain(..).rev() {
292
0
            data_sum += chunk.len() as u64;
293
0
            self.queued_data.push_front(chunk);
294
0
        }
295
1
        assert!(self.recent_data.is_empty(), 
"Recent_data should be empty"0
);
296
        // Ensure the sum of the bytes in recent_data is equal to the bytes_received.
297
0
        error_if!(
298
1
            data_sum != self.bytes_received,
  Branch (298:13): [True: 0, False: 1]
  Branch (298:13): [Folded - Ignored]
299
            "Sum of recent_data bytes does not equal bytes_received"
300
        );
301
1
        self.bytes_received = 0;
302
1
        Ok(())
303
1
    }
304
305
    /// Drains the reader until an EOF is received, but sends data to the void.
306
1.00k
    pub async fn drain(&mut self) -> Result<(), Error> {
307
        loop {
308
1.00k
            if self
  Branch (308:16): [True: 2, False: 0]
  Branch (308:16): [Folded - Ignored]
  Branch (308:16): [True: 1.00k, False: 0]
  Branch (308:16): [True: 0, False: 0]
  Branch (308:16): [True: 1, False: 2]
  Branch (308:16): [Folded - Ignored]
309
1.00k
                .recv()
310
1.00k
                .await
311
1.00k
                .err_tip(|| "Failed to drain in buf_channel::drain")
?0
312
1.00k
                .is_empty()
313
            {
314
1.00k
                break; // EOF.
315
2
            }
316
        }
317
1.00k
        Ok(())
318
1.00k
    }
319
320
    /// Peek the next set of bytes in the stream without consuming them.
321
13.4k
    pub async fn peek(&mut self) -> Result<&Bytes, Error> {
322
13.4k
        if self.queued_data.is_empty() {
  Branch (322:12): [True: 2, False: 0]
  Branch (322:12): [True: 6.95k, False: 0]
  Branch (322:12): [Folded - Ignored]
  Branch (322:12): [True: 1, False: 0]
  Branch (322:12): [True: 1.36k, False: 0]
  Branch (322:12): [True: 0, False: 0]
  Branch (322:12): [True: 17, False: 0]
  Branch (322:12): [True: 1.02k, False: 0]
  Branch (322:12): [True: 3.89k, False: 0]
  Branch (322:12): [True: 1, False: 0]
  Branch (322:12): [True: 10, False: 0]
  Branch (322:12): [True: 6, False: 0]
  Branch (322:12): [True: 9, False: 0]
  Branch (322:12): [True: 0, False: 0]
  Branch (322:12): [True: 9, False: 0]
  Branch (322:12): [True: 2, False: 0]
  Branch (322:12): [Folded - Ignored]
  Branch (322:12): [True: 4, False: 0]
  Branch (322:12): [True: 2, False: 0]
  Branch (322:12): [True: 46, False: 0]
  Branch (322:12): [True: 53, False: 0]
  Branch (322:12): [True: 5, False: 0]
  Branch (322:12): [True: 4, False: 0]
  Branch (322:12): [True: 0, False: 0]
323
13.4k
            let 
chunk13.4k
= self.recv().await.err_tip(|| "In buf_channel::peek")
?5
;
324
13.4k
            self.queued_data.push_front(chunk);
325
0
        }
326
13.4k
        Ok(self
327
13.4k
            .queued_data
328
13.4k
            .front()
329
13.4k
            .expect("Should have data in the queue"))
330
13.4k
    }
331
332
    /// The number of bytes received over this stream so far.
333
0
    pub const fn get_bytes_received(&self) -> u64 {
334
0
        self.bytes_received
335
0
    }
336
337
    /// Takes exactly `size` number of bytes from the stream and returns them.
338
    /// This means the stream will keep polling until either an EOF is received or
339
    /// `size` bytes are received and concat them all together then return them.
340
    /// This method is optimized to reduce copies when possible.
341
    /// If `size` is None, it will take all the bytes in the stream.
342
34.2k
    pub async fn consume(&mut self, size: Option<usize>) -> Result<Bytes, Error> {
343
34.2k
        let size = size.unwrap_or(usize::MAX);
344
1.67k
        let first_chunk = {
345
34.2k
            let 
mut chunk34.1k
= self
346
34.2k
                .recv()
347
34.2k
                .await
348
34.2k
                .err_tip(|| "During first read of buf_channel::take()")
?11
;
349
34.1k
            if chunk.is_empty() {
  Branch (349:16): [True: 2, False: 9.76k]
  Branch (349:16): [True: 82, False: 17.0k]
  Branch (349:16): [Folded - Ignored]
  Branch (349:16): [True: 0, False: 1]
  Branch (349:16): [True: 0, False: 2.00k]
  Branch (349:16): [True: 0, False: 0]
  Branch (349:16): [True: 0, False: 17]
  Branch (349:16): [True: 207, False: 1.02k]
  Branch (349:16): [True: 60, False: 3.89k]
  Branch (349:16): [True: 0, False: 1]
  Branch (349:16): [True: 0, False: 10]
  Branch (349:16): [True: 1, False: 5]
  Branch (349:16): [True: 1, False: 9]
  Branch (349:16): [True: 0, False: 0]
  Branch (349:16): [True: 16, False: 9]
  Branch (349:16): [True: 2, False: 2]
  Branch (349:16): [Folded - Ignored]
  Branch (349:16): [True: 0, False: 4]
  Branch (349:16): [True: 0, False: 2]
  Branch (349:16): [True: 0, False: 46]
  Branch (349:16): [True: 4, False: 5]
  Branch (349:16): [True: 0, False: 5]
  Branch (349:16): [True: 0, False: 4]
  Branch (349:16): [True: 0, False: 0]
350
375
                return Ok(chunk); // EOF.
351
33.8k
            }
352
33.8k
            if chunk.len() > size {
  Branch (352:16): [True: 9.76k, False: 2]
  Branch (352:16): [True: 10.0k, False: 6.94k]
  Branch (352:16): [Folded - Ignored]
  Branch (352:16): [True: 0, False: 1]
  Branch (352:16): [True: 645, False: 1.36k]
  Branch (352:16): [True: 0, False: 0]
  Branch (352:16): [True: 0, False: 17]
  Branch (352:16): [True: 0, False: 1.02k]
  Branch (352:16): [True: 0, False: 3.89k]
  Branch (352:16): [True: 0, False: 1]
  Branch (352:16): [True: 0, False: 10]
  Branch (352:16): [True: 0, False: 5]
  Branch (352:16): [True: 0, False: 9]
  Branch (352:16): [True: 0, False: 0]
  Branch (352:16): [True: 0, False: 9]
  Branch (352:16): [True: 0, False: 2]
  Branch (352:16): [Folded - Ignored]
  Branch (352:16): [True: 0, False: 4]
  Branch (352:16): [True: 0, False: 2]
  Branch (352:16): [True: 0, False: 46]
  Branch (352:16): [True: 2, False: 3]
  Branch (352:16): [True: 0, False: 5]
  Branch (352:16): [True: 0, False: 4]
  Branch (352:16): [True: 0, False: 0]
353
20.4k
                let remaining = chunk.split_off(size);
354
20.4k
                self.queued_data.push_front(remaining);
355
                // No need to read EOF if we are a partial chunk.
356
20.4k
                return Ok(chunk);
357
13.3k
            }
358
            // Try to read our EOF to ensure our sender did not error out.
359
13.3k
            match self.peek().await {
360
13.3k
                Ok(peeked_chunk) => {
361
13.3k
                    if peeked_chunk.is_empty() || 
chunk1.85k
.len() == size {
  Branch (361:24): [True: 2, False: 0]
  Branch (361:51): [True: 0, False: 0]
  Branch (361:24): [True: 6.92k, False: 15]
  Branch (361:51): [True: 0, False: 15]
  Branch (361:24): [Folded - Ignored]
  Branch (361:51): [Folded - Ignored]
  Branch (361:24): [True: 1, False: 0]
  Branch (361:51): [True: 0, False: 0]
  Branch (361:24): [True: 748, False: 616]
  Branch (361:51): [True: 180, False: 436]
  Branch (361:24): [True: 0, False: 0]
  Branch (361:51): [True: 0, False: 0]
  Branch (361:24): [True: 17, False: 0]
  Branch (361:51): [True: 0, False: 0]
  Branch (361:24): [True: 454, False: 571]
  Branch (361:51): [True: 0, False: 571]
  Branch (361:24): [True: 3.25k, False: 639]
  Branch (361:51): [True: 0, False: 639]
  Branch (361:24): [True: 1, False: 0]
  Branch (361:51): [True: 0, False: 0]
  Branch (361:24): [True: 10, False: 0]
  Branch (361:51): [True: 0, False: 0]
  Branch (361:24): [True: 1, False: 4]
  Branch (361:51): [True: 1, False: 3]
  Branch (361:24): [True: 2, False: 7]
  Branch (361:51): [True: 0, False: 7]
  Branch (361:24): [True: 0, False: 0]
  Branch (361:51): [True: 0, False: 0]
  Branch (361:24): [True: 9, False: 0]
  Branch (361:51): [True: 0, False: 0]
  Branch (361:24): [True: 2, False: 0]
  Branch (361:51): [True: 0, False: 0]
  Branch (361:24): [Folded - Ignored]
  Branch (361:51): [Folded - Ignored]
  Branch (361:24): [True: 2, False: 2]
  Branch (361:51): [True: 1, False: 1]
  Branch (361:24): [True: 2, False: 0]
  Branch (361:51): [True: 0, False: 0]
  Branch (361:24): [True: 46, False: 0]
  Branch (361:51): [True: 0, False: 0]
  Branch (361:24): [True: 2, False: 1]
  Branch (361:51): [True: 0, False: 1]
  Branch (361:24): [True: 5, False: 0]
  Branch (361:51): [True: 0, False: 0]
  Branch (361:24): [True: 4, False: 0]
  Branch (361:51): [True: 0, False: 0]
  Branch (361:24): [True: 0, False: 0]
  Branch (361:51): [True: 0, False: 0]
362
11.6k
                        return Ok(chunk);
363
1.67k
                    }
364
                }
365
4
                Err(e) => {
366
4
                    return Err(e).err_tip(|| "Failed to check if next chunk is EOF")?;
367
                }
368
            }
369
1.67k
            chunk
370
        };
371
1.67k
        let mut output = BytesMut::new();
372
1.67k
        output.extend_from_slice(&first_chunk);
373
374
        loop {
375
120k
            let 
mut chunk120k
= self
376
120k
                .recv()
377
120k
                .await
378
120k
                .err_tip(|| "During next read of buf_channel::take()")
?1
;
379
120k
            if chunk.is_empty() {
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 14, False: 268]
  Branch (379:16): [Folded - Ignored]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 180, False: 746]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 271, False: 834]
  Branch (379:16): [True: 279, False: 1.23k]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 2, False: 1.04k]
  Branch (379:16): [True: 0, False: 115k]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [Folded - Ignored]
  Branch (379:16): [True: 0, False: 1]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 0, False: 1]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 0, False: 0]
380
746
                break; // EOF.
381
119k
            }
382
119k
            if output.len() + chunk.len() > size {
  Branch (382:16): [True: 0, False: 0]
  Branch (382:16): [True: 0, False: 268]
  Branch (382:16): [Folded - Ignored]
  Branch (382:16): [True: 0, False: 0]
  Branch (382:16): [True: 99, False: 647]
  Branch (382:16): [True: 0, False: 0]
  Branch (382:16): [True: 0, False: 0]
  Branch (382:16): [True: 0, False: 834]
  Branch (382:16): [True: 0, False: 1.23k]
  Branch (382:16): [True: 0, False: 0]
  Branch (382:16): [True: 0, False: 0]
  Branch (382:16): [True: 0, False: 1.04k]
  Branch (382:16): [True: 0, False: 115k]
  Branch (382:16): [True: 0, False: 0]
  Branch (382:16): [True: 0, False: 0]
  Branch (382:16): [True: 0, False: 0]
  Branch (382:16): [Folded - Ignored]
  Branch (382:16): [True: 0, False: 1]
  Branch (382:16): [True: 0, False: 0]
  Branch (382:16): [True: 0, False: 0]
  Branch (382:16): [True: 0, False: 1]
  Branch (382:16): [True: 0, False: 0]
  Branch (382:16): [True: 0, False: 0]
  Branch (382:16): [True: 0, False: 0]
383
99
                // Slice off the extra data and put it back into the queue. We are done.
384
99
                let remaining = chunk.split_off(size - output.len());
385
99
                self.queued_data.push_front(remaining);
386
119k
            }
387
119k
            output.extend_from_slice(&chunk);
388
119k
            if output.len() == size {
  Branch (388:16): [True: 0, False: 0]
  Branch (388:16): [True: 1, False: 267]
  Branch (388:16): [Folded - Ignored]
  Branch (388:16): [True: 0, False: 0]
  Branch (388:16): [True: 256, False: 490]
  Branch (388:16): [True: 0, False: 0]
  Branch (388:16): [True: 0, False: 0]
  Branch (388:16): [True: 300, False: 534]
  Branch (388:16): [True: 359, False: 880]
  Branch (388:16): [True: 0, False: 0]
  Branch (388:16): [True: 0, False: 0]
  Branch (388:16): [True: 1, False: 1.03k]
  Branch (388:16): [True: 7, False: 115k]
  Branch (388:16): [True: 0, False: 0]
  Branch (388:16): [True: 0, False: 0]
  Branch (388:16): [True: 0, False: 0]
  Branch (388:16): [Folded - Ignored]
  Branch (388:16): [True: 1, False: 0]
  Branch (388:16): [True: 0, False: 0]
  Branch (388:16): [True: 0, False: 0]
  Branch (388:16): [True: 1, False: 0]
  Branch (388:16): [True: 0, False: 0]
  Branch (388:16): [True: 0, False: 0]
  Branch (388:16): [True: 0, False: 0]
389
926
                break; // We are done.
390
118k
            }
391
        }
392
1.67k
        Ok(output.freeze())
393
34.2k
    }
394
}
395
396
impl Stream for DropCloserReadHalf {
397
    type Item = Result<Bytes, std::io::Error>;
398
399
    // TODO(aaronmondal) This is not very efficient as we are creating a new future on every
400
    // poll() call. It might be better to use a waker.
401
112
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
402
112
        Box::pin(self.recv())
403
112
            .as_mut()
404
112
            .poll(cx)
405
112
            .map(|result| match 
result80
{
406
79
                Ok(bytes) => {
407
79
                    if bytes.is_empty() {
  Branch (407:24): [True: 13, False: 66]
  Branch (407:24): [Folded - Ignored]
408
13
                        return None;
409
66
                    }
410
66
                    Some(Ok(bytes))
411
                }
412
1
                Err(e) => Some(Err(e.to_std_err())),
413
80
            })
414
112
    }
415
}