Coverage Report

Created: 2025-04-19 16:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/buf_channel.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::VecDeque;
16
use std::pin::Pin;
17
use std::sync::Arc;
18
use std::sync::atomic::{AtomicBool, Ordering};
19
use std::task::Poll;
20
21
use bytes::{Bytes, BytesMut};
22
use futures::task::Context;
23
use futures::{Future, Stream, TryFutureExt};
24
use nativelink_error::{Code, Error, ResultExt, error_if, make_err, make_input_err};
25
use tokio::sync::mpsc;
26
use tracing::{Level, event};
27
28
const ZERO_DATA: Bytes = Bytes::new();
29
30
/// Create a channel pair that can be used to transport buffer objects around to
31
/// different components. This wrapper is used because the streams give some
32
/// utility like managing EOF in a more friendly way, ensure if no EOF is received
33
/// it will send an error to the receiver channel before shutting down and count
34
/// the number of bytes sent.
35
#[must_use]
36
13.6k
pub fn make_buf_channel_pair() -> (DropCloserWriteHalf, DropCloserReadHalf) {
37
13.6k
    // We allow up to 2 items in the buffer at any given time. There is no major
38
13.6k
    // reason behind this magic number other than thinking it will be nice to give
39
13.6k
    // a little time for another thread to wake up and consume data if another
40
13.6k
    // thread is pumping large amounts of data into the channel.
41
13.6k
    let (tx, rx) = mpsc::channel(2);
42
13.6k
    let eof_sent = Arc::new(AtomicBool::new(false));
43
13.6k
    (
44
13.6k
        DropCloserWriteHalf {
45
13.6k
            tx: Some(tx),
46
13.6k
            bytes_written: 0,
47
13.6k
            eof_sent: eof_sent.clone(),
48
13.6k
        },
49
13.6k
        DropCloserReadHalf {
50
13.6k
            rx,
51
13.6k
            queued_data: VecDeque::new(),
52
13.6k
            last_err: None,
53
13.6k
            eof_sent,
54
13.6k
            bytes_received: 0,
55
13.6k
            recent_data: Vec::new(),
56
13.6k
            max_recent_data_size: 0,
57
13.6k
        },
58
13.6k
    )
59
13.6k
}
60
61
/// Writer half of the pair.
62
#[derive(Debug)]
63
pub struct DropCloserWriteHalf {
64
    tx: Option<mpsc::Sender<Bytes>>,
65
    bytes_written: u64,
66
    eof_sent: Arc<AtomicBool>,
67
}
68
69
impl DropCloserWriteHalf {
70
    /// Sends data over the channel to the receiver.
71
17.5k
    pub fn send(&mut self, buf: Bytes) -> impl Future<Output = Result<(), Error>> + '_ {
72
17.5k
        self.send_get_bytes_on_error(buf).map_err(|err| 
err.00
)
73
17.5k
    }
74
75
    /// Sends data over the channel to the receiver.
76
    #[inline]
77
17.5k
    async fn send_get_bytes_on_error(&mut self, buf: Bytes) -> Result<(), (Error, Bytes)> {
78
17.5k
        let tx = match self
79
17.5k
            .tx
80
17.5k
            .as_ref()
81
17.5k
            .ok_or_else(|| 
make_err!(Code::Internal, "Tried to send while stream is closed")0
)
82
        {
83
17.5k
            Ok(tx) => tx,
84
0
            Err(e) => return Err((e, buf)),
85
        };
86
17.5k
        let Ok(buf_len) = u64::try_from(buf.len()) else {
  Branch (86:13): [True: 13, False: 0]
  Branch (86:13): [True: 8.60k, False: 0]
  Branch (86:13): [Folded - Ignored]
  Branch (86:13): [True: 64, False: 0]
  Branch (86:13): [True: 2.02k, False: 0]
  Branch (86:13): [True: 1, False: 0]
  Branch (86:13): [True: 24, False: 0]
  Branch (86:13): [True: 5, False: 0]
  Branch (86:13): [True: 97, False: 0]
  Branch (86:13): [True: 4, False: 0]
  Branch (86:13): [True: 6, False: 0]
  Branch (86:13): [True: 1.06k, False: 0]
  Branch (86:13): [True: 0, False: 0]
  Branch (86:13): [True: 0, False: 0]
  Branch (86:13): [True: 406, False: 0]
  Branch (86:13): [Folded - Ignored]
  Branch (86:13): [True: 5, False: 0]
  Branch (86:13): [True: 3, False: 0]
  Branch (86:13): [True: 98, False: 0]
  Branch (86:13): [True: 104, False: 0]
  Branch (86:13): [True: 5.01k, False: 0]
  Branch (86:13): [True: 6, False: 0]
  Branch (86:13): [True: 12, False: 0]
87
0
            return Err((
88
0
                make_err!(Code::Internal, "Could not convert usize to u64"),
89
0
                buf,
90
0
            ));
91
        };
92
17.5k
        if buf_len == 0 {
  Branch (92:12): [True: 0, False: 13]
  Branch (92:12): [True: 0, False: 8.60k]
  Branch (92:12): [Folded - Ignored]
  Branch (92:12): [True: 0, False: 64]
  Branch (92:12): [True: 0, False: 2.02k]
  Branch (92:12): [True: 0, False: 1]
  Branch (92:12): [True: 0, False: 24]
  Branch (92:12): [True: 0, False: 5]
  Branch (92:12): [True: 0, False: 97]
  Branch (92:12): [True: 0, False: 4]
  Branch (92:12): [True: 0, False: 6]
  Branch (92:12): [True: 0, False: 1.06k]
  Branch (92:12): [True: 0, False: 0]
  Branch (92:12): [True: 0, False: 0]
  Branch (92:12): [True: 0, False: 406]
  Branch (92:12): [Folded - Ignored]
  Branch (92:12): [True: 0, False: 5]
  Branch (92:12): [True: 0, False: 3]
  Branch (92:12): [True: 0, False: 98]
  Branch (92:12): [True: 0, False: 104]
  Branch (92:12): [True: 0, False: 5.01k]
  Branch (92:12): [True: 0, False: 6]
  Branch (92:12): [True: 0, False: 12]
93
0
            return Err((
94
0
                make_input_err!("Cannot send EOF in send(). Instead use send_eof()"),
95
0
                buf,
96
0
            ));
97
17.5k
        }
98
17.5k
        if let Err(
err0
) = tx.send(buf).await {
  Branch (98:16): [True: 0, False: 13]
  Branch (98:16): [True: 0, False: 8.60k]
  Branch (98:16): [Folded - Ignored]
  Branch (98:16): [True: 0, False: 64]
  Branch (98:16): [True: 0, False: 2.02k]
  Branch (98:16): [True: 0, False: 1]
  Branch (98:16): [True: 0, False: 24]
  Branch (98:16): [True: 0, False: 5]
  Branch (98:16): [True: 0, False: 97]
  Branch (98:16): [True: 0, False: 4]
  Branch (98:16): [True: 0, False: 6]
  Branch (98:16): [True: 0, False: 1.06k]
  Branch (98:16): [True: 0, False: 0]
  Branch (98:16): [True: 0, False: 0]
  Branch (98:16): [True: 0, False: 406]
  Branch (98:16): [Folded - Ignored]
  Branch (98:16): [True: 0, False: 5]
  Branch (98:16): [True: 0, False: 3]
  Branch (98:16): [True: 0, False: 98]
  Branch (98:16): [True: 0, False: 104]
  Branch (98:16): [True: 0, False: 5.01k]
  Branch (98:16): [True: 0, False: 6]
  Branch (98:16): [True: 0, False: 12]
99
            // Close our channel.
100
0
            self.tx = None;
101
0
            return Err((
102
0
                make_err!(
103
0
                    Code::Internal,
104
0
                    "Failed to write to data, receiver disconnected"
105
0
                ),
106
0
                err.0,
107
0
            ));
108
17.5k
        }
109
17.5k
        self.bytes_written += buf_len;
110
17.5k
        Ok(())
111
17.5k
    }
112
113
    /// Binds a reader and a writer together. This will send all the data from the reader
114
    /// to the writer until an EOF is received.
115
    /// This will always read one message ahead to ensure that if an error happens
116
    /// on the EOF message it will not forward on the last payload message and instead
117
    /// forward on the error.
118
2
    pub async fn bind_buffered(&mut self, reader: &mut DropCloserReadHalf) -> Result<(), Error> {
119
        loop {
120
52
            let chunk = reader
121
52
                .recv()
122
52
                .await
123
52
                .err_tip(|| 
"In DropCloserWriteHalf::bind_buffered::recv"0
)
?0
;
124
52
            if chunk.is_empty() {
  Branch (124:16): [True: 0, False: 0]
  Branch (124:16): [Folded - Ignored]
  Branch (124:16): [True: 0, False: 1]
  Branch (124:16): [Folded - Ignored]
  Branch (124:16): [True: 1, False: 50]
125
1
                self.send_eof()
126
1
                    .err_tip(|| 
"In DropCloserWriteHalf::bind_buffered::send_eof"0
)
?0
;
127
1
                break; // EOF.
128
51
            }
129
51
            // Always read one message ahead so if we get an error on our EOF
130
51
            // we forward it on to the reader.
131
51
            if reader.peek().await.is_err() {
  Branch (131:16): [True: 0, False: 0]
  Branch (131:16): [Folded - Ignored]
  Branch (131:16): [True: 1, False: 0]
  Branch (131:16): [Folded - Ignored]
  Branch (131:16): [True: 0, False: 50]
132
                // Read our next message for good book keeping.
133
0
                drop(
134
1
                    reader
135
1
                        .recv()
136
1
                        .await
137
1
                        .err_tip(|| "In DropCloserWriteHalf::bind_buffered::peek::eof")?,
138
                );
139
0
                return Err(make_err!(
140
0
                    Code::Internal,
141
0
                    "DropCloserReadHalf::peek() said error, but when data received said Ok. This should never happen."
142
0
                ));
143
50
            }
144
50
            match self.send_get_bytes_on_error(chunk).await {
145
50
                Ok(()) => {}
146
0
                Err(e) => {
147
0
                    reader.queued_data.push_front(e.1);
148
0
                    return Err(e.0).err_tip(|| "In DropCloserWriteHalf::bind_buffered::send");
149
                }
150
            }
151
        }
152
1
        Ok(())
153
2
    }
154
155
    /// Sends an EOF (End of File) message to the receiver which will gracefully let the
156
    /// stream know it has no more data. This will close the stream.
157
13.5k
    pub fn send_eof(&mut self) -> Result<(), Error> {
158
13.5k
        // Flag that we have sent the EOF.
159
13.5k
        let eof_was_sent = self.eof_sent.swap(true, Ordering::Release);
160
13.5k
        if eof_was_sent {
  Branch (160:12): [True: 1, False: 13.5k]
  Branch (160:12): [Folded - Ignored]
161
1
            event!(
162
1
                Level::WARN,
163
1
                "Stream already closed when eof already was sent. This is often ok for retry was triggered, but should not happen on happy path."
164
            );
165
1
            return Ok(());
166
13.5k
        }
167
13.5k
168
13.5k
        // Now close our stream.
169
13.5k
        self.tx = None;
170
13.5k
        Ok(())
171
13.5k
    }
172
173
    /// Returns the number of bytes written so far. This does not mean the receiver received
174
    /// all of the bytes written to the stream so far.
175
    #[must_use]
176
141
    pub const fn get_bytes_written(&self) -> u64 {
177
141
        self.bytes_written
178
141
    }
179
180
    /// Returns if the pipe was broken. This is good for determining if the reader broke the
181
    /// pipe or the writer broke the pipe, since this will only return true if the pipe was
182
    /// broken by the writer.
183
    #[must_use]
184
0
    pub const fn is_pipe_broken(&self) -> bool {
185
0
        self.tx.is_none()
186
0
    }
187
}
188
189
/// Reader half of the pair.
190
#[derive(Debug)]
191
pub struct DropCloserReadHalf {
192
    rx: mpsc::Receiver<Bytes>,
193
    /// Number of bytes received over the stream.
194
    bytes_received: u64,
195
    eof_sent: Arc<AtomicBool>,
196
    /// If there was an error in the stream, this will be set to the last error.
197
    last_err: Option<Error>,
198
    /// If not empty, this is the data that needs to be sent out before
199
    /// data from the underlying channel can should be sent.
200
    queued_data: VecDeque<Bytes>,
201
    /// As data is being read from the stream, this buffer will be filled
202
    /// with the most recent data. Once `max_recent_data_size` is reached
203
    /// this buffer will be cleared and no longer be populated.
204
    /// This is useful if the caller wants to reset the the reader to before
205
    /// any of the data was received if possible (eg: something failed and
206
    /// we want to retry).
207
    recent_data: Vec<Bytes>,
208
    /// Amount of data to keep in the `recent_data` buffer before clearing it
209
    /// and no longer populating it.
210
    max_recent_data_size: u64,
211
}
212
213
impl DropCloserReadHalf {
214
    /// Returns if the stream has data ready.
215
0
    pub fn is_empty(&self) -> bool {
216
0
        self.rx.is_empty()
217
0
    }
218
219
30.6k
    fn recv_inner(&mut self, chunk: Bytes) -> Result<Bytes, Error> {
220
30.6k
        // `queued_data` is allowed to have empty bytes that represent EOF
221
30.6k
        if chunk.is_empty() {
  Branch (221:12): [True: 13.1k, False: 17.5k]
  Branch (221:12): [Folded - Ignored]
222
13.1k
            if !self.eof_sent.load(Ordering::Acquire) {
  Branch (222:16): [True: 21, False: 13.0k]
  Branch (222:16): [Folded - Ignored]
223
21
                let err = make_err!(Code::Internal, "Sender dropped before sending EOF");
224
21
                self.queued_data.clear();
225
21
                self.recent_data.clear();
226
21
                self.bytes_received = 0;
227
21
                self.last_err = Some(err.clone());
228
21
                return Err(err);
229
13.0k
            };
230
13.0k
231
13.0k
            self.maybe_populate_recent_data(&ZERO_DATA);
232
13.0k
            return Ok(ZERO_DATA);
233
17.5k
        };
234
17.5k
235
17.5k
        self.bytes_received += chunk.len() as u64;
236
17.5k
        self.maybe_populate_recent_data(&chunk);
237
17.5k
        Ok(chunk)
238
30.6k
    }
239
240
    /// Try to receive a chunk of data, returning `None` if none is available.
241
53.9k
    pub fn try_recv(&mut self) -> Option<Result<Bytes, Error>> {
242
53.9k
        if let Some(
err2
) = &self.last_err {
  Branch (242:16): [True: 2, False: 53.9k]
  Branch (242:16): [Folded - Ignored]
243
2
            return Some(Err(err.clone()));
244
53.9k
        }
245
53.9k
        self.queued_data.pop_front().map(Ok)
246
53.9k
    }
247
248
    /// Receive a chunk of data, waiting asynchronously until some is available.
249
53.9k
    pub async fn recv(&mut self) -> Result<Bytes, Error> {
250
53.9k
        if let Some(
result23.2k
) = self.try_recv() {
  Branch (250:16): [True: 9.76k, False: 5]
  Branch (250:16): [True: 10.0k, False: 14.3k]
  Branch (250:16): [True: 0, False: 112]
  Branch (250:16): [True: 0, False: 2]
  Branch (250:16): [True: 0, False: 6]
  Branch (250:16): [True: 2.10k, False: 3.20k]
  Branch (250:16): [True: 0, False: 0]
  Branch (250:16): [True: 0, False: 35]
  Branch (250:16): [True: 571, False: 2.79k]
  Branch (250:16): [True: 639, False: 8.73k]
  Branch (250:16): [True: 0, False: 2]
  Branch (250:16): [True: 0, False: 24]
  Branch (250:16): [True: 5, False: 1.09k]
  Branch (250:16): [True: 0, False: 0]
  Branch (250:16): [True: 0, False: 42]
  Branch (250:16): [True: 0, False: 7]
  Branch (250:16): [Folded - Ignored]
  Branch (250:16): [True: 1, False: 9]
  Branch (250:16): [True: 0, False: 4]
  Branch (250:16): [True: 0, False: 233]
  Branch (250:16): [True: 54, False: 60]
  Branch (250:16): [True: 0, False: 10]
  Branch (250:16): [True: 0, False: 8]
  Branch (250:16): [True: 0, False: 0]
251
23.2k
            result
252
        } else {
253
            // `None` here indicates EOF, which we represent as Zero data
254
30.6k
            let 
data30.6k
= self.rx.recv().await.unwrap_or(ZERO_DATA);
255
30.6k
            self.recv_inner(data)
256
        }
257
53.8k
    }
258
259
30.6k
    fn maybe_populate_recent_data(&mut self, chunk: &Bytes) {
260
30.6k
        if self.max_recent_data_size == 0 {
  Branch (260:12): [True: 30.5k, False: 52]
  Branch (260:12): [Folded - Ignored]
261
30.5k
            return; // Fast path.
262
52
        }
263
52
        if self.bytes_received > self.max_recent_data_size {
  Branch (263:12): [True: 0, False: 52]
  Branch (263:12): [Folded - Ignored]
264
0
            if !self.recent_data.is_empty() {
  Branch (264:16): [True: 0, False: 0]
  Branch (264:16): [Folded - Ignored]
265
0
                self.recent_data.clear();
266
0
            }
267
0
            return;
268
52
        }
269
52
        self.recent_data.push(chunk.clone());
270
30.6k
    }
271
272
    /// Sets the maximum size of the `recent_data` buffer. If the number of bytes
273
    /// received exceeds this size, the `recent_data` buffer will be cleared and
274
    /// no longer populated.
275
2
    pub const fn set_max_recent_data_size(&mut self, size: u64) {
276
2
        self.max_recent_data_size = size;
277
2
    }
278
279
    /// Attempts to reset the stream to before any data was received. This will
280
    /// only work if the number of bytes received is less than `max_recent_data_size`.
281
    ///
282
    /// On error the state of the stream is undefined and the caller should not
283
    /// attempt to use the stream again.
284
1
    pub fn try_reset_stream(&mut self) -> Result<(), Error> {
285
1
        if self.bytes_received > self.max_recent_data_size {
  Branch (285:12): [True: 0, False: 1]
  Branch (285:12): [Folded - Ignored]
286
0
            return Err(make_err!(
287
0
                Code::Internal,
288
0
                "Cannot reset stream, max_recent_data_size exceeded"
289
0
            ));
290
1
        }
291
1
        let mut data_sum = 0;
292
1
        for 
chunk0
in self.recent_data.drain(..).rev() {
293
0
            data_sum += chunk.len() as u64;
294
0
            self.queued_data.push_front(chunk);
295
0
        }
296
1
        assert!(self.recent_data.is_empty(), 
"Recent_data should be empty"0
);
297
        // Ensure the sum of the bytes in recent_data is equal to the bytes_received.
298
0
        error_if!(
299
1
            data_sum != self.bytes_received,
  Branch (299:13): [True: 0, False: 1]
  Branch (299:13): [Folded - Ignored]
300
            "Sum of recent_data bytes does not equal bytes_received"
301
        );
302
1
        self.bytes_received = 0;
303
1
        Ok(())
304
1
    }
305
306
    /// Drains the reader until an EOF is received, but sends data to the void.
307
1.00k
    pub async fn drain(&mut self) -> Result<(), Error> {
308
        loop {
309
1.00k
            if self
  Branch (309:16): [True: 2, False: 0]
  Branch (309:16): [Folded - Ignored]
  Branch (309:16): [True: 1.00k, False: 0]
  Branch (309:16): [True: 0, False: 0]
  Branch (309:16): [True: 1, False: 2]
  Branch (309:16): [Folded - Ignored]
310
1.00k
                .recv()
311
1.00k
                .await
312
1.00k
                .err_tip(|| 
"Failed to drain in buf_channel::drain"0
)
?0
313
1.00k
                .is_empty()
314
            {
315
1.00k
                break; // EOF.
316
2
            }
317
        }
318
1.00k
        Ok(())
319
1.00k
    }
320
321
    /// Peek the next set of bytes in the stream without consuming them.
322
13.3k
    pub async fn peek(&mut self) -> Result<&Bytes, Error> {
323
13.3k
        if self.queued_data.is_empty() {
  Branch (323:12): [True: 2, False: 0]
  Branch (323:12): [True: 6.94k, False: 0]
  Branch (323:12): [Folded - Ignored]
  Branch (323:12): [True: 1, False: 0]
  Branch (323:12): [True: 1.36k, False: 0]
  Branch (323:12): [True: 0, False: 0]
  Branch (323:12): [True: 17, False: 0]
  Branch (323:12): [True: 1.02k, False: 0]
  Branch (323:12): [True: 3.89k, False: 0]
  Branch (323:12): [True: 1, False: 0]
  Branch (323:12): [True: 10, False: 0]
  Branch (323:12): [True: 6, False: 0]
  Branch (323:12): [True: 0, False: 0]
  Branch (323:12): [True: 9, False: 0]
  Branch (323:12): [True: 2, False: 0]
  Branch (323:12): [Folded - Ignored]
  Branch (323:12): [True: 4, False: 0]
  Branch (323:12): [True: 2, False: 0]
  Branch (323:12): [True: 46, False: 0]
  Branch (323:12): [True: 53, False: 0]
  Branch (323:12): [True: 5, False: 0]
  Branch (323:12): [True: 4, False: 0]
  Branch (323:12): [True: 0, False: 0]
324
13.3k
            let 
chunk13.3k
= self.recv().await.err_tip(||
"In buf_channel::peek"5
)
?5
;
325
13.3k
            self.queued_data.push_front(chunk);
326
0
        }
327
13.3k
        Ok(self
328
13.3k
            .queued_data
329
13.3k
            .front()
330
13.3k
            .expect("Should have data in the queue"))
331
13.3k
    }
332
333
    /// The number of bytes received over this stream so far.
334
0
    pub const fn get_bytes_received(&self) -> u64 {
335
0
        self.bytes_received
336
0
    }
337
338
    /// Takes exactly `size` number of bytes from the stream and returns them.
339
    /// This means the stream will keep polling until either an EOF is received or
340
    /// `size` bytes are received and concat them all together then return them.
341
    /// This method is optimized to reduce copies when possible.
342
    /// If `size` is None, it will take all the bytes in the stream.
343
34.1k
    pub async fn consume(&mut self, size: Option<usize>) -> Result<Bytes, Error> {
344
34.1k
        let size = size.unwrap_or(usize::MAX);
345
1.66k
        let first_chunk = {
346
34.1k
            let 
mut chunk34.1k
= self
347
34.1k
                .recv()
348
34.1k
                .await
349
34.1k
                .err_tip(|| 
"During first read of buf_channel::take()"11
)
?11
;
350
34.1k
            if chunk.is_empty() {
  Branch (350:16): [True: 2, False: 9.76k]
  Branch (350:16): [True: 81, False: 16.9k]
  Branch (350:16): [Folded - Ignored]
  Branch (350:16): [True: 0, False: 1]
  Branch (350:16): [True: 0, False: 2.00k]
  Branch (350:16): [True: 0, False: 0]
  Branch (350:16): [True: 0, False: 17]
  Branch (350:16): [True: 207, False: 1.02k]
  Branch (350:16): [True: 60, False: 3.89k]
  Branch (350:16): [True: 0, False: 1]
  Branch (350:16): [True: 0, False: 10]
  Branch (350:16): [True: 1, False: 5]
  Branch (350:16): [True: 0, False: 0]
  Branch (350:16): [True: 16, False: 9]
  Branch (350:16): [True: 2, False: 2]
  Branch (350:16): [Folded - Ignored]
  Branch (350:16): [True: 0, False: 4]
  Branch (350:16): [True: 0, False: 2]
  Branch (350:16): [True: 0, False: 46]
  Branch (350:16): [True: 4, False: 5]
  Branch (350:16): [True: 0, False: 5]
  Branch (350:16): [True: 0, False: 4]
  Branch (350:16): [True: 0, False: 0]
351
373
                return Ok(chunk); // EOF.
352
33.8k
            }
353
33.8k
            if chunk.len() > size {
  Branch (353:16): [True: 9.76k, False: 2]
  Branch (353:16): [True: 10.0k, False: 6.94k]
  Branch (353:16): [Folded - Ignored]
  Branch (353:16): [True: 0, False: 1]
  Branch (353:16): [True: 645, False: 1.36k]
  Branch (353:16): [True: 0, False: 0]
  Branch (353:16): [True: 0, False: 17]
  Branch (353:16): [True: 0, False: 1.02k]
  Branch (353:16): [True: 0, False: 3.89k]
  Branch (353:16): [True: 0, False: 1]
  Branch (353:16): [True: 0, False: 10]
  Branch (353:16): [True: 0, False: 5]
  Branch (353:16): [True: 0, False: 0]
  Branch (353:16): [True: 0, False: 9]
  Branch (353:16): [True: 0, False: 2]
  Branch (353:16): [Folded - Ignored]
  Branch (353:16): [True: 0, False: 4]
  Branch (353:16): [True: 0, False: 2]
  Branch (353:16): [True: 0, False: 46]
  Branch (353:16): [True: 2, False: 3]
  Branch (353:16): [True: 0, False: 5]
  Branch (353:16): [True: 0, False: 4]
  Branch (353:16): [True: 0, False: 0]
354
20.4k
                let remaining = chunk.split_off(size);
355
20.4k
                self.queued_data.push_front(remaining);
356
20.4k
                // No need to read EOF if we are a partial chunk.
357
20.4k
                return Ok(chunk);
358
13.3k
            }
359
13.3k
            // Try to read our EOF to ensure our sender did not error out.
360
13.3k
            match self.peek().await {
361
13.3k
                Ok(peeked_chunk) => {
362
13.3k
                    if peeked_chunk.is_empty() || 
chunk.len() == size1.84k
{
  Branch (362:24): [True: 2, False: 0]
  Branch (362:51): [True: 0, False: 0]
  Branch (362:24): [True: 6.92k, False: 13]
  Branch (362:51): [True: 0, False: 13]
  Branch (362:24): [Folded - Ignored]
  Branch (362:51): [Folded - Ignored]
  Branch (362:24): [True: 1, False: 0]
  Branch (362:51): [True: 0, False: 0]
  Branch (362:24): [True: 748, False: 616]
  Branch (362:51): [True: 180, False: 436]
  Branch (362:24): [True: 0, False: 0]
  Branch (362:51): [True: 0, False: 0]
  Branch (362:24): [True: 17, False: 0]
  Branch (362:51): [True: 0, False: 0]
  Branch (362:24): [True: 454, False: 571]
  Branch (362:51): [True: 0, False: 571]
  Branch (362:24): [True: 3.25k, False: 639]
  Branch (362:51): [True: 0, False: 639]
  Branch (362:24): [True: 1, False: 0]
  Branch (362:51): [True: 0, False: 0]
  Branch (362:24): [True: 10, False: 0]
  Branch (362:51): [True: 0, False: 0]
  Branch (362:24): [True: 1, False: 4]
  Branch (362:51): [True: 1, False: 3]
  Branch (362:24): [True: 0, False: 0]
  Branch (362:51): [True: 0, False: 0]
  Branch (362:24): [True: 9, False: 0]
  Branch (362:51): [True: 0, False: 0]
  Branch (362:24): [True: 2, False: 0]
  Branch (362:51): [True: 0, False: 0]
  Branch (362:24): [Folded - Ignored]
  Branch (362:51): [Folded - Ignored]
  Branch (362:24): [True: 2, False: 2]
  Branch (362:51): [True: 1, False: 1]
  Branch (362:24): [True: 2, False: 0]
  Branch (362:51): [True: 0, False: 0]
  Branch (362:24): [True: 46, False: 0]
  Branch (362:51): [True: 0, False: 0]
  Branch (362:24): [True: 2, False: 1]
  Branch (362:51): [True: 0, False: 1]
  Branch (362:24): [True: 5, False: 0]
  Branch (362:51): [True: 0, False: 0]
  Branch (362:24): [True: 4, False: 0]
  Branch (362:51): [True: 0, False: 0]
  Branch (362:24): [True: 0, False: 0]
  Branch (362:51): [True: 0, False: 0]
363
11.6k
                        return Ok(chunk);
364
1.66k
                    }
365
                }
366
4
                Err(e) => {
367
4
                    return Err(e.clone()).err_tip(|| "Failed to check if next chunk is EOF")?;
368
                }
369
            }
370
1.66k
            chunk
371
1.66k
        };
372
1.66k
        let mut output = BytesMut::new();
373
1.66k
        output.extend_from_slice(&first_chunk);
374
375
        loop {
376
4.77k
            let 
mut chunk4.77k
= self
377
4.77k
                .recv()
378
4.77k
                .await
379
4.77k
                .err_tip(|| 
"During next read of buf_channel::take()"1
)
?1
;
380
4.77k
            if chunk.is_empty() {
  Branch (380:16): [True: 0, False: 0]
  Branch (380:16): [True: 13, False: 168]
  Branch (380:16): [Folded - Ignored]
  Branch (380:16): [True: 0, False: 0]
  Branch (380:16): [True: 180, False: 746]
  Branch (380:16): [True: 0, False: 0]
  Branch (380:16): [True: 0, False: 0]
  Branch (380:16): [True: 271, False: 834]
  Branch (380:16): [True: 279, False: 1.23k]
  Branch (380:16): [True: 0, False: 0]
  Branch (380:16): [True: 0, False: 0]
  Branch (380:16): [True: 2, False: 1.04k]
  Branch (380:16): [True: 0, False: 0]
  Branch (380:16): [True: 0, False: 0]
  Branch (380:16): [True: 0, False: 0]
  Branch (380:16): [Folded - Ignored]
  Branch (380:16): [True: 0, False: 1]
  Branch (380:16): [True: 0, False: 0]
  Branch (380:16): [True: 0, False: 0]
  Branch (380:16): [True: 0, False: 1]
  Branch (380:16): [True: 0, False: 0]
  Branch (380:16): [True: 0, False: 0]
  Branch (380:16): [True: 0, False: 0]
381
745
                break; // EOF.
382
4.02k
            }
383
4.02k
            if output.len() + chunk.len() > size {
  Branch (383:16): [True: 0, False: 0]
  Branch (383:16): [True: 0, False: 168]
  Branch (383:16): [Folded - Ignored]
  Branch (383:16): [True: 0, False: 0]
  Branch (383:16): [True: 99, False: 647]
  Branch (383:16): [True: 0, False: 0]
  Branch (383:16): [True: 0, False: 0]
  Branch (383:16): [True: 0, False: 834]
  Branch (383:16): [True: 0, False: 1.23k]
  Branch (383:16): [True: 0, False: 0]
  Branch (383:16): [True: 0, False: 0]
  Branch (383:16): [True: 0, False: 1.04k]
  Branch (383:16): [True: 0, False: 0]
  Branch (383:16): [True: 0, False: 0]
  Branch (383:16): [True: 0, False: 0]
  Branch (383:16): [Folded - Ignored]
  Branch (383:16): [True: 0, False: 1]
  Branch (383:16): [True: 0, False: 0]
  Branch (383:16): [True: 0, False: 0]
  Branch (383:16): [True: 0, False: 1]
  Branch (383:16): [True: 0, False: 0]
  Branch (383:16): [True: 0, False: 0]
  Branch (383:16): [True: 0, False: 0]
384
99
                // Slice off the extra data and put it back into the queue. We are done.
385
99
                let remaining = chunk.split_off(size - output.len());
386
99
                self.queued_data.push_front(remaining);
387
3.93k
            }
388
4.02k
            output.extend_from_slice(&chunk);
389
4.02k
            if output.len() == size {
  Branch (389:16): [True: 0, False: 0]
  Branch (389:16): [True: 0, False: 168]
  Branch (389:16): [Folded - Ignored]
  Branch (389:16): [True: 0, False: 0]
  Branch (389:16): [True: 256, False: 490]
  Branch (389:16): [True: 0, False: 0]
  Branch (389:16): [True: 0, False: 0]
  Branch (389:16): [True: 300, False: 534]
  Branch (389:16): [True: 359, False: 880]
  Branch (389:16): [True: 0, False: 0]
  Branch (389:16): [True: 0, False: 0]
  Branch (389:16): [True: 1, False: 1.03k]
  Branch (389:16): [True: 0, False: 0]
  Branch (389:16): [True: 0, False: 0]
  Branch (389:16): [True: 0, False: 0]
  Branch (389:16): [Folded - Ignored]
  Branch (389:16): [True: 1, False: 0]
  Branch (389:16): [True: 0, False: 0]
  Branch (389:16): [True: 0, False: 0]
  Branch (389:16): [True: 1, False: 0]
  Branch (389:16): [True: 0, False: 0]
  Branch (389:16): [True: 0, False: 0]
  Branch (389:16): [True: 0, False: 0]
390
918
                break; // We are done.
391
3.11k
            }
392
        }
393
1.66k
        Ok(output.freeze())
394
34.1k
    }
395
}
396
397
impl Stream for DropCloserReadHalf {
398
    type Item = Result<Bytes, std::io::Error>;
399
400
    // TODO(blaise.bruer) This is not very efficient as we are creating a new future on every
401
    // poll() call. It might be better to use a waker.
402
112
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
403
112
        Box::pin(self.recv())
404
112
            .as_mut()
405
112
            .poll(cx)
406
112
            .map(|result| 
match result80
{
407
79
                Ok(bytes) => {
408
79
                    if bytes.is_empty() {
  Branch (408:24): [True: 13, False: 66]
  Branch (408:24): [Folded - Ignored]
409
13
                        return None;
410
66
                    }
411
66
                    Some(Ok(bytes))
412
                }
413
1
                Err(e) => Some(Err(e.to_std_err())),
414
80
            })
415
112
    }
416
}