Coverage Report

Created: 2024-11-20 10:13

/build/source/nativelink-util/src/buf_channel.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::VecDeque;
16
use std::pin::Pin;
17
use std::sync::atomic::{AtomicBool, Ordering};
18
use std::sync::Arc;
19
use std::task::Poll;
20
21
use bytes::{Bytes, BytesMut};
22
use futures::task::Context;
23
use futures::{Future, Stream, TryFutureExt};
24
use nativelink_error::{error_if, make_err, make_input_err, Code, Error, ResultExt};
25
use tokio::sync::mpsc;
26
use tracing::{event, Level};
27
28
const ZERO_DATA: Bytes = Bytes::new();
29
30
/// Create a channel pair that can be used to transport buffer objects around to
31
/// different components. This wrapper is used because the streams give some
32
/// utility like managing EOF in a more friendly way, ensure if no EOF is received
33
/// it will send an error to the receiver channel before shutting down and count
34
/// the number of bytes sent.
35
#[must_use]
36
13.5k
pub fn make_buf_channel_pair() -> (DropCloserWriteHalf, DropCloserReadHalf) {
37
13.5k
    // We allow up to 2 items in the buffer at any given time. There is no major
38
13.5k
    // reason behind this magic number other than thinking it will be nice to give
39
13.5k
    // a little time for another thread to wake up and consume data if another
40
13.5k
    // thread is pumping large amounts of data into the channel.
41
13.5k
    let (tx, rx) = mpsc::channel(2);
42
13.5k
    let eof_sent = Arc::new(AtomicBool::new(false));
43
13.5k
    (
44
13.5k
        DropCloserWriteHalf {
45
13.5k
            tx: Some(tx),
46
13.5k
            bytes_written: 0,
47
13.5k
            eof_sent: eof_sent.clone(),
48
13.5k
        },
49
13.5k
        DropCloserReadHalf {
50
13.5k
            rx,
51
13.5k
            queued_data: VecDeque::new(),
52
13.5k
            last_err: None,
53
13.5k
            eof_sent,
54
13.5k
            bytes_received: 0,
55
13.5k
            recent_data: Vec::new(),
56
13.5k
            max_recent_data_size: 0,
57
13.5k
        },
58
13.5k
    )
59
13.5k
}
60
61
/// Writer half of the pair.
62
pub struct DropCloserWriteHalf {
63
    tx: Option<mpsc::Sender<Bytes>>,
64
    bytes_written: u64,
65
    eof_sent: Arc<AtomicBool>,
66
}
67
68
impl DropCloserWriteHalf {
69
    /// Sends data over the channel to the receiver.
70
17.6k
    pub fn send(&mut self, buf: Bytes) -> impl Future<Output = Result<(), Error>> + '_ {
71
17.6k
        self.send_get_bytes_on_error(buf).map_err(|err| 
err.00
)
72
17.6k
    }
73
74
    /// Sends data over the channel to the receiver.
75
    #[inline]
76
17.6k
    async fn send_get_bytes_on_error(&mut self, buf: Bytes) -> Result<(), (Error, Bytes)> {
77
17.6k
        let tx = match self
78
17.6k
            .tx
79
17.6k
            .as_ref()
80
17.6k
            .ok_or_else(|| 
make_err!(Code::Internal, "Tried to send while stream is closed")0
)
81
        {
82
17.6k
            Ok(tx) => tx,
83
0
            Err(e) => return Err((e, buf)),
84
        };
85
17.6k
        let Ok(buf_len) = u64::try_from(buf.len()) else {
  Branch (85:13): [True: 13, False: 0]
  Branch (85:13): [True: 8.74k, False: 0]
  Branch (85:13): [Folded - Ignored]
  Branch (85:13): [True: 64, False: 0]
  Branch (85:13): [True: 2.02k, False: 0]
  Branch (85:13): [True: 1, False: 0]
  Branch (85:13): [True: 24, False: 0]
  Branch (85:13): [True: 5, False: 0]
  Branch (85:13): [True: 111, False: 0]
  Branch (85:13): [True: 4, False: 0]
  Branch (85:13): [True: 6, False: 0]
  Branch (85:13): [True: 1.06k, False: 0]
  Branch (85:13): [True: 0, False: 0]
  Branch (85:13): [True: 0, False: 0]
  Branch (85:13): [True: 406, False: 0]
  Branch (85:13): [Folded - Ignored]
  Branch (85:13): [True: 5, False: 0]
  Branch (85:13): [True: 3, False: 0]
  Branch (85:13): [True: 58, False: 0]
  Branch (85:13): [True: 104, False: 0]
  Branch (85:13): [True: 5.01k, False: 0]
  Branch (85:13): [True: 6, False: 0]
  Branch (85:13): [True: 12, False: 0]
86
0
            return Err((
87
0
                make_err!(Code::Internal, "Could not convert usize to u64"),
88
0
                buf,
89
0
            ));
90
        };
91
17.6k
        if buf_len == 0 {
  Branch (91:12): [True: 0, False: 13]
  Branch (91:12): [True: 0, False: 8.74k]
  Branch (91:12): [Folded - Ignored]
  Branch (91:12): [True: 0, False: 64]
  Branch (91:12): [True: 0, False: 2.02k]
  Branch (91:12): [True: 0, False: 1]
  Branch (91:12): [True: 0, False: 24]
  Branch (91:12): [True: 0, False: 5]
  Branch (91:12): [True: 0, False: 111]
  Branch (91:12): [True: 0, False: 4]
  Branch (91:12): [True: 0, False: 6]
  Branch (91:12): [True: 0, False: 1.06k]
  Branch (91:12): [True: 0, False: 0]
  Branch (91:12): [True: 0, False: 0]
  Branch (91:12): [True: 0, False: 406]
  Branch (91:12): [Folded - Ignored]
  Branch (91:12): [True: 0, False: 5]
  Branch (91:12): [True: 0, False: 3]
  Branch (91:12): [True: 0, False: 58]
  Branch (91:12): [True: 0, False: 104]
  Branch (91:12): [True: 0, False: 5.01k]
  Branch (91:12): [True: 0, False: 6]
  Branch (91:12): [True: 0, False: 12]
92
0
            return Err((
93
0
                make_input_err!("Cannot send EOF in send(). Instead use send_eof()"),
94
0
                buf,
95
0
            ));
96
17.6k
        }
97
17.6k
        if let Err(
err0
) = tx.send(buf).
await1.32k
{
  Branch (97:16): [True: 0, False: 13]
  Branch (97:16): [True: 0, False: 8.74k]
  Branch (97:16): [Folded - Ignored]
  Branch (97:16): [True: 0, False: 64]
  Branch (97:16): [True: 0, False: 2.02k]
  Branch (97:16): [True: 0, False: 1]
  Branch (97:16): [True: 0, False: 24]
  Branch (97:16): [True: 0, False: 5]
  Branch (97:16): [True: 0, False: 111]
  Branch (97:16): [True: 0, False: 4]
  Branch (97:16): [True: 0, False: 6]
  Branch (97:16): [True: 0, False: 1.06k]
  Branch (97:16): [True: 0, False: 0]
  Branch (97:16): [True: 0, False: 0]
  Branch (97:16): [True: 0, False: 406]
  Branch (97:16): [Folded - Ignored]
  Branch (97:16): [True: 0, False: 5]
  Branch (97:16): [True: 0, False: 3]
  Branch (97:16): [True: 0, False: 58]
  Branch (97:16): [True: 0, False: 104]
  Branch (97:16): [True: 0, False: 5.01k]
  Branch (97:16): [True: 0, False: 6]
  Branch (97:16): [True: 0, False: 12]
98
            // Close our channel.
99
0
            self.tx = None;
100
0
            return Err((
101
0
                make_err!(
102
0
                    Code::Internal,
103
0
                    "Failed to write to data, receiver disconnected"
104
0
                ),
105
0
                err.0,
106
0
            ));
107
17.6k
        }
108
17.6k
        self.bytes_written += buf_len;
109
17.6k
        Ok(())
110
17.6k
    }
111
112
    /// Binds a reader and a writer together. This will send all the data from the reader
113
    /// to the writer until an EOF is received.
114
    /// This will always read one message ahead to ensure that if an error happens
115
    /// on the EOF message it will not forward on the last payload message and instead
116
    /// forward on the error.
117
2
    pub async fn bind_buffered(&mut self, reader: &mut DropCloserReadHalf) -> Result<(), Error> {
118
        loop {
119
52
            let chunk = reader
120
52
                .recv()
121
3
                .await
122
52
                .err_tip(|| 
"In DropCloserWriteHalf::bind_buffered::recv"0
)
?0
;
123
52
            if chunk.is_empty() {
  Branch (123:16): [True: 0, False: 0]
  Branch (123:16): [Folded - Ignored]
  Branch (123:16): [True: 0, False: 1]
  Branch (123:16): [Folded - Ignored]
  Branch (123:16): [True: 1, False: 50]
124
1
                self.send_eof()
125
1
                    .err_tip(|| 
"In DropCloserWriteHalf::bind_buffered::send_eof"0
)
?0
;
126
1
                break; // EOF.
127
51
            }
128
51
            // Always read one message ahead so if we get an error on our EOF
129
51
            // we forward it on to the reader.
130
51
            if reader.peek().
await2
.is_err() {
  Branch (130:16): [True: 0, False: 0]
  Branch (130:16): [Folded - Ignored]
  Branch (130:16): [True: 1, False: 0]
  Branch (130:16): [Folded - Ignored]
  Branch (130:16): [True: 0, False: 50]
131
                // Read our next message for good book keeping.
132
1
                let _ = reader
133
1
                    .recv()
134
0
                    .await
135
1
                    .err_tip(|| "In DropCloserWriteHalf::bind_buffered::peek::eof")?;
136
0
                return Err(make_err!(
137
0
                    Code::Internal,
138
0
                    "DropCloserReadHalf::peek() said error, but when data received said Ok. This should never happen."
139
0
                ));
140
50
            }
141
50
            match self.send_get_bytes_on_error(chunk).
await47
{
142
50
                Ok(()) => {}
143
0
                Err(e) => {
144
0
                    reader.queued_data.push_front(e.1);
145
0
                    return Err(e.0).err_tip(|| "In DropCloserWriteHalf::bind_buffered::send");
146
                }
147
            }
148
        }
149
1
        Ok(())
150
2
    }
151
152
    /// Sends an EOF (End of File) message to the receiver which will gracefully let the
153
    /// stream know it has no more data. This will close the stream.
154
13.5k
    pub fn send_eof(&mut self) -> Result<(), Error> {
155
13.5k
        // Flag that we have sent the EOF.
156
13.5k
        let eof_was_sent = self.eof_sent.swap(true, Ordering::Release);
157
13.5k
        if eof_was_sent {
  Branch (157:12): [True: 1, False: 13.5k]
  Branch (157:12): [Folded - Ignored]
158
1
            event!(
159
1
                Level::WARN,
160
1
                "Stream already closed when eof already was sent. This is often ok for retry was triggered, but should not happen on happy path."
161
            );
162
1
            return Ok(());
163
13.5k
        }
164
13.5k
165
13.5k
        // Now close our stream.
166
13.5k
        self.tx = None;
167
13.5k
        Ok(())
168
13.5k
    }
169
170
    /// Returns the number of bytes written so far. This does not mean the receiver received
171
    /// all of the bytes written to the stream so far.
172
    #[must_use]
173
115
    pub const fn get_bytes_written(&self) -> u64 {
174
115
        self.bytes_written
175
115
    }
176
177
    /// Returns if the pipe was broken. This is good for determining if the reader broke the
178
    /// pipe or the writer broke the pipe, since this will only return true if the pipe was
179
    /// broken by the writer.
180
    #[must_use]
181
2
    pub const fn is_pipe_broken(&self) -> bool {
182
2
        self.tx.is_none()
183
2
    }
184
}
185
186
/// Reader half of the pair.
187
pub struct DropCloserReadHalf {
188
    rx: mpsc::Receiver<Bytes>,
189
    /// Number of bytes received over the stream.
190
    bytes_received: u64,
191
    eof_sent: Arc<AtomicBool>,
192
    /// If there was an error in the stream, this will be set to the last error.
193
    last_err: Option<Error>,
194
    /// If not empty, this is the data that needs to be sent out before
195
    /// data from the underlying channel can should be sent.
196
    queued_data: VecDeque<Bytes>,
197
    /// As data is being read from the stream, this buffer will be filled
198
    /// with the most recent data. Once `max_recent_data_size` is reached
199
    /// this buffer will be cleared and no longer be populated.
200
    /// This is useful if the caller wants to reset the the reader to before
201
    /// any of the data was received if possible (eg: something failed and
202
    /// we want to retry).
203
    recent_data: Vec<Bytes>,
204
    /// Amount of data to keep in the recent_data buffer before clearing it
205
    /// and no longer populating it.
206
    max_recent_data_size: u64,
207
}
208
209
impl DropCloserReadHalf {
210
    /// Returns if the stream has data ready.
211
0
    pub fn is_empty(&self) -> bool {
212
0
        self.rx.is_empty()
213
0
    }
214
215
30.7k
    fn recv_inner(&mut self, chunk: Bytes) -> Result<Bytes, Error> {
216
30.7k
        // `queued_data` is allowed to have empty bytes that represent EOF
217
30.7k
        if chunk.is_empty() {
  Branch (217:12): [True: 13.0k, False: 17.6k]
  Branch (217:12): [Folded - Ignored]
218
13.0k
            if !self.eof_sent.load(Ordering::Acquire) {
  Branch (218:16): [True: 19, False: 13.0k]
  Branch (218:16): [Folded - Ignored]
219
19
                let err = make_err!(Code::Internal, "Sender dropped before sending EOF");
220
19
                self.queued_data.clear();
221
19
                self.recent_data.clear();
222
19
                self.bytes_received = 0;
223
19
                self.last_err = Some(err.clone());
224
19
                return Err(err);
225
13.0k
            };
226
13.0k
227
13.0k
            self.maybe_populate_recent_data(&ZERO_DATA);
228
13.0k
            return Ok(ZERO_DATA);
229
17.6k
        };
230
17.6k
231
17.6k
        self.bytes_received += chunk.len() as u64;
232
17.6k
        self.maybe_populate_recent_data(&chunk);
233
17.6k
        Ok(chunk)
234
30.7k
    }
235
236
    /// Try to receive a chunk of data, returning `None` if none is available.
237
54.0k
    pub fn try_recv(&mut self) -> Option<Result<Bytes, Error>> {
238
54.0k
        if let Some(
err2
) = &self.last_err {
  Branch (238:16): [True: 2, False: 54.0k]
  Branch (238:16): [Folded - Ignored]
239
2
            return Some(Err(err.clone()));
240
54.0k
        }
241
54.0k
        self.queued_data.pop_front().map(Ok)
242
54.0k
    }
243
244
    /// Receive a chunk of data, waiting asynchronously until some is available.
245
54.0k
    pub async fn recv(&mut self) -> Result<Bytes, Error> {
246
54.0k
        if let Some(
result23.2k
) = self.try_recv() {
  Branch (246:16): [True: 9.76k, False: 5]
  Branch (246:16): [True: 10.0k, False: 14.2k]
  Branch (246:16): [True: 0, False: 112]
  Branch (246:16): [True: 0, False: 2]
  Branch (246:16): [True: 0, False: 6]
  Branch (246:16): [True: 2.10k, False: 3.20k]
  Branch (246:16): [True: 0, False: 0]
  Branch (246:16): [True: 0, False: 35]
  Branch (246:16): [True: 571, False: 2.79k]
  Branch (246:16): [True: 705, False: 9.02k]
  Branch (246:16): [True: 0, False: 2]
  Branch (246:16): [True: 0, False: 24]
  Branch (246:16): [True: 5, False: 1.10k]
  Branch (246:16): [True: 0, False: 0]
  Branch (246:16): [True: 0, False: 32]
  Branch (246:16): [True: 0, False: 7]
  Branch (246:16): [Folded - Ignored]
  Branch (246:16): [True: 1, False: 9]
  Branch (246:16): [True: 0, False: 4]
  Branch (246:16): [True: 0, False: 130]
  Branch (246:16): [True: 54, False: 60]
  Branch (246:16): [True: 0, False: 10]
  Branch (246:16): [True: 0, False: 8]
  Branch (246:16): [True: 0, False: 0]
247
23.2k
            result
248
        } else {
249
            // `None` here indicates EOF, which we represent as Zero data
250
30.7k
            let 
data30.7k
= self.rx.recv().
await12.8k
.unwrap_or(ZERO_DATA);
251
30.7k
            self.recv_inner(data)
252
        }
253
54.0k
    }
254
255
30.7k
    fn maybe_populate_recent_data(&mut self, chunk: &Bytes) {
256
30.7k
        if self.max_recent_data_size == 0 {
  Branch (256:12): [True: 30.6k, False: 52]
  Branch (256:12): [Folded - Ignored]
257
30.6k
            return; // Fast path.
258
52
        }
259
52
        if self.bytes_received > self.max_recent_data_size {
  Branch (259:12): [True: 0, False: 52]
  Branch (259:12): [Folded - Ignored]
260
0
            if !self.recent_data.is_empty() {
  Branch (260:16): [True: 0, False: 0]
  Branch (260:16): [Folded - Ignored]
261
0
                self.recent_data.clear();
262
0
            }
263
0
            return;
264
52
        }
265
52
        self.recent_data.push(chunk.clone());
266
30.7k
    }
267
268
    /// Sets the maximum size of the recent_data buffer. If the number of bytes
269
    /// received exceeds this size, the recent_data buffer will be cleared and
270
    /// no longer populated.
271
2
    pub fn set_max_recent_data_size(&mut self, size: u64) {
272
2
        self.max_recent_data_size = size;
273
2
    }
274
275
    /// Attempts to reset the stream to before any data was received. This will
276
    /// only work if the number of bytes received is less than max_recent_data_size.
277
    ///
278
    /// On error the state of the stream is undefined and the caller should not
279
    /// attempt to use the stream again.
280
1
    pub fn try_reset_stream(&mut self) -> Result<(), Error> {
281
1
        if self.bytes_received > self.max_recent_data_size {
  Branch (281:12): [True: 0, False: 1]
  Branch (281:12): [Folded - Ignored]
282
0
            return Err(make_err!(
283
0
                Code::Internal,
284
0
                "Cannot reset stream, max_recent_data_size exceeded"
285
0
            ));
286
1
        }
287
1
        let mut data_sum = 0;
288
1
        for 
chunk0
in self.recent_data.drain(..).rev() {
289
0
            data_sum += chunk.len() as u64;
290
0
            self.queued_data.push_front(chunk);
291
0
        }
292
1
        assert!(self.recent_data.is_empty(), 
"Recent_data should be empty"0
);
293
        // Ensure the sum of the bytes in recent_data is equal to the bytes_received.
294
0
        error_if!(
295
1
            data_sum != self.bytes_received,
  Branch (295:13): [True: 0, False: 1]
  Branch (295:13): [Folded - Ignored]
296
            "Sum of recent_data bytes does not equal bytes_received"
297
        );
298
1
        self.bytes_received = 0;
299
1
        Ok(())
300
1
    }
301
302
    /// Drains the reader until an EOF is received, but sends data to the void.
303
1.00k
    pub async fn drain(&mut self) -> Result<(), Error> {
304
        loop {
305
1.00k
            if self
  Branch (305:16): [True: 2, False: 0]
  Branch (305:16): [Folded - Ignored]
  Branch (305:16): [True: 1.00k, False: 0]
  Branch (305:16): [True: 0, False: 0]
  Branch (305:16): [True: 1, False: 2]
  Branch (305:16): [Folded - Ignored]
306
1.00k
                .recv()
307
6
                .await
308
1.00k
                .err_tip(|| 
"Failed to drain in buf_channel::drain"0
)
?0
309
1.00k
                .is_empty()
310
            {
311
1.00k
                break; // EOF.
312
2
            }
313
        }
314
1.00k
        Ok(())
315
1.00k
    }
316
317
    /// Peek the next set of bytes in the stream without consuming them.
318
13.4k
    pub async fn peek(&mut self) -> Result<&Bytes, Error> {
319
13.4k
        if self.queued_data.is_empty() {
  Branch (319:12): [True: 2, False: 0]
  Branch (319:12): [True: 6.93k, False: 0]
  Branch (319:12): [Folded - Ignored]
  Branch (319:12): [True: 1, False: 0]
  Branch (319:12): [True: 1.36k, False: 0]
  Branch (319:12): [True: 0, False: 0]
  Branch (319:12): [True: 17, False: 0]
  Branch (319:12): [True: 1.02k, False: 0]
  Branch (319:12): [True: 3.99k, False: 0]
  Branch (319:12): [True: 1, False: 0]
  Branch (319:12): [True: 10, False: 0]
  Branch (319:12): [True: 8, False: 0]
  Branch (319:12): [True: 0, False: 0]
  Branch (319:12): [True: 8, False: 0]
  Branch (319:12): [True: 2, False: 0]
  Branch (319:12): [Folded - Ignored]
  Branch (319:12): [True: 4, False: 0]
  Branch (319:12): [True: 2, False: 0]
  Branch (319:12): [True: 25, False: 0]
  Branch (319:12): [True: 53, False: 0]
  Branch (319:12): [True: 5, False: 0]
  Branch (319:12): [True: 4, False: 0]
  Branch (319:12): [True: 0, False: 0]
320
13.4k
            let 
chunk13.4k
= self.recv().
await414
.err_tip(||
"In buf_channel::peek"5
)
?5
;
321
13.4k
            self.queued_data.push_front(chunk);
322
0
        }
323
13.4k
        Ok(self
324
13.4k
            .queued_data
325
13.4k
            .front()
326
13.4k
            .expect("Should have data in the queue"))
327
13.4k
    }
328
329
    /// The number of bytes received over this stream so far.
330
0
    pub fn get_bytes_received(&self) -> u64 {
331
0
        self.bytes_received
332
0
    }
333
334
    /// Takes exactly `size` number of bytes from the stream and returns them.
335
    /// This means the stream will keep polling until either an EOF is received or
336
    /// `size` bytes are received and concat them all together then return them.
337
    /// This method is optimized to reduce copies when possible.
338
    /// If `size` is None, it will take all the bytes in the stream.
339
34.2k
    pub async fn consume(&mut self, size: Option<usize>) -> Result<Bytes, Error> {
340
34.2k
        let size = size.unwrap_or(usize::MAX);
341
1.73k
        let first_chunk = {
342
34.2k
            let 
mut chunk34.2k
= self
343
34.2k
                .recv()
344
10.2k
                .await
345
34.2k
                .err_tip(|| 
"During first read of buf_channel::take()"9
)
?9
;
346
34.2k
            if chunk.is_empty() {
  Branch (346:16): [True: 2, False: 9.76k]
  Branch (346:16): [True: 62, False: 16.9k]
  Branch (346:16): [Folded - Ignored]
  Branch (346:16): [True: 0, False: 1]
  Branch (346:16): [True: 0, False: 2.00k]
  Branch (346:16): [True: 0, False: 0]
  Branch (346:16): [True: 0, False: 17]
  Branch (346:16): [True: 207, False: 1.02k]
  Branch (346:16): [True: 60, False: 3.99k]
  Branch (346:16): [True: 0, False: 1]
  Branch (346:16): [True: 0, False: 10]
  Branch (346:16): [True: 1, False: 7]
  Branch (346:16): [True: 0, False: 0]
  Branch (346:16): [True: 8, False: 8]
  Branch (346:16): [True: 2, False: 2]
  Branch (346:16): [Folded - Ignored]
  Branch (346:16): [True: 0, False: 4]
  Branch (346:16): [True: 0, False: 2]
  Branch (346:16): [True: 0, False: 25]
  Branch (346:16): [True: 4, False: 5]
  Branch (346:16): [True: 0, False: 5]
  Branch (346:16): [True: 0, False: 4]
  Branch (346:16): [True: 0, False: 0]
347
346
                return Ok(chunk); // EOF.
348
33.8k
            }
349
33.8k
            if chunk.len() > size {
  Branch (349:16): [True: 9.76k, False: 2]
  Branch (349:16): [True: 10.0k, False: 6.93k]
  Branch (349:16): [Folded - Ignored]
  Branch (349:16): [True: 0, False: 1]
  Branch (349:16): [True: 645, False: 1.36k]
  Branch (349:16): [True: 0, False: 0]
  Branch (349:16): [True: 0, False: 17]
  Branch (349:16): [True: 0, False: 1.02k]
  Branch (349:16): [True: 0, False: 3.99k]
  Branch (349:16): [True: 0, False: 1]
  Branch (349:16): [True: 0, False: 10]
  Branch (349:16): [True: 0, False: 7]
  Branch (349:16): [True: 0, False: 0]
  Branch (349:16): [True: 0, False: 8]
  Branch (349:16): [True: 0, False: 2]
  Branch (349:16): [Folded - Ignored]
  Branch (349:16): [True: 0, False: 4]
  Branch (349:16): [True: 0, False: 2]
  Branch (349:16): [True: 0, False: 25]
  Branch (349:16): [True: 2, False: 3]
  Branch (349:16): [True: 0, False: 5]
  Branch (349:16): [True: 0, False: 4]
  Branch (349:16): [True: 0, False: 0]
350
20.4k
                let remaining = chunk.split_off(size);
351
20.4k
                self.queued_data.push_front(remaining);
352
20.4k
                // No need to read EOF if we are a partial chunk.
353
20.4k
                return Ok(chunk);
354
13.4k
            }
355
13.4k
            // Try to read our EOF to ensure our sender did not error out.
356
13.4k
            match self.peek().
await411
{
357
13.3k
                Ok(peeked_chunk) => {
358
13.3k
                    if peeked_chunk.is_empty() || 
chunk.len() == size1.91k
{
  Branch (358:24): [True: 2, False: 0]
  Branch (358:51): [True: 0, False: 0]
  Branch (358:24): [True: 6.91k, False: 13]
  Branch (358:51): [True: 0, False: 13]
  Branch (358:24): [Folded - Ignored]
  Branch (358:51): [Folded - Ignored]
  Branch (358:24): [True: 1, False: 0]
  Branch (358:51): [True: 0, False: 0]
  Branch (358:24): [True: 748, False: 616]
  Branch (358:51): [True: 180, False: 436]
  Branch (358:24): [True: 0, False: 0]
  Branch (358:51): [True: 0, False: 0]
  Branch (358:24): [True: 17, False: 0]
  Branch (358:51): [True: 0, False: 0]
  Branch (358:24): [True: 454, False: 571]
  Branch (358:51): [True: 0, False: 571]
  Branch (358:24): [True: 3.28k, False: 705]
  Branch (358:51): [True: 0, False: 705]
  Branch (358:24): [True: 1, False: 0]
  Branch (358:51): [True: 0, False: 0]
  Branch (358:24): [True: 10, False: 0]
  Branch (358:51): [True: 0, False: 0]
  Branch (358:24): [True: 3, False: 4]
  Branch (358:51): [True: 1, False: 3]
  Branch (358:24): [True: 0, False: 0]
  Branch (358:51): [True: 0, False: 0]
  Branch (358:24): [True: 8, False: 0]
  Branch (358:51): [True: 0, False: 0]
  Branch (358:24): [True: 2, False: 0]
  Branch (358:51): [True: 0, False: 0]
  Branch (358:24): [Folded - Ignored]
  Branch (358:51): [Folded - Ignored]
  Branch (358:24): [True: 2, False: 2]
  Branch (358:51): [True: 1, False: 1]
  Branch (358:24): [True: 2, False: 0]
  Branch (358:51): [True: 0, False: 0]
  Branch (358:24): [True: 25, False: 0]
  Branch (358:51): [True: 0, False: 0]
  Branch (358:24): [True: 2, False: 1]
  Branch (358:51): [True: 0, False: 1]
  Branch (358:24): [True: 5, False: 0]
  Branch (358:51): [True: 0, False: 0]
  Branch (358:24): [True: 4, False: 0]
  Branch (358:51): [True: 0, False: 0]
  Branch (358:24): [True: 0, False: 0]
  Branch (358:51): [True: 0, False: 0]
359
11.6k
                        return Ok(chunk);
360
1.73k
                    }
361
                }
362
4
                Err(e) => {
363
4
                    return Err(e.clone()).err_tip(|| "Failed to check if next chunk is EOF")?
364
                }
365
            }
366
1.73k
            chunk
367
1.73k
        };
368
1.73k
        let mut output = BytesMut::new();
369
1.73k
        output.extend_from_slice(&first_chunk);
370
371
        loop {
372
4.94k
            let 
mut chunk4.94k
= self
373
4.94k
                .recv()
374
2.21k
                .await
375
4.94k
                .err_tip(|| 
"During first read of buf_channel::take()"1
)
?1
;
376
4.94k
            if chunk.is_empty() {
  Branch (376:16): [True: 0, False: 0]
  Branch (376:16): [True: 13, False: 168]
  Branch (376:16): [Folded - Ignored]
  Branch (376:16): [True: 0, False: 0]
  Branch (376:16): [True: 180, False: 746]
  Branch (376:16): [True: 0, False: 0]
  Branch (376:16): [True: 0, False: 0]
  Branch (376:16): [True: 271, False: 834]
  Branch (376:16): [True: 354, False: 1.33k]
  Branch (376:16): [True: 0, False: 0]
  Branch (376:16): [True: 0, False: 0]
  Branch (376:16): [True: 2, False: 1.04k]
  Branch (376:16): [True: 0, False: 0]
  Branch (376:16): [True: 0, False: 0]
  Branch (376:16): [True: 0, False: 0]
  Branch (376:16): [Folded - Ignored]
  Branch (376:16): [True: 0, False: 1]
  Branch (376:16): [True: 0, False: 0]
  Branch (376:16): [True: 0, False: 0]
  Branch (376:16): [True: 0, False: 1]
  Branch (376:16): [True: 0, False: 0]
  Branch (376:16): [True: 0, False: 0]
  Branch (376:16): [True: 0, False: 0]
377
820
                break; // EOF.
378
4.12k
            }
379
4.12k
            if output.len() + chunk.len() > size {
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 0, False: 168]
  Branch (379:16): [Folded - Ignored]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 99, False: 647]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 0, False: 834]
  Branch (379:16): [True: 0, False: 1.33k]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 0, False: 1.04k]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [Folded - Ignored]
  Branch (379:16): [True: 0, False: 1]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 0, False: 1]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 0, False: 0]
  Branch (379:16): [True: 0, False: 0]
380
99
                // Slice off the extra data and put it back into the queue. We are done.
381
99
                let remaining = chunk.split_off(size - output.len());
382
99
                self.queued_data.push_front(remaining);
383
4.02k
            }
384
4.12k
            output.extend_from_slice(&chunk);
385
4.12k
            if output.len() == size {
  Branch (385:16): [True: 0, False: 0]
  Branch (385:16): [True: 0, False: 168]
  Branch (385:16): [Folded - Ignored]
  Branch (385:16): [True: 0, False: 0]
  Branch (385:16): [True: 256, False: 490]
  Branch (385:16): [True: 0, False: 0]
  Branch (385:16): [True: 0, False: 0]
  Branch (385:16): [True: 300, False: 534]
  Branch (385:16): [True: 350, False: 982]
  Branch (385:16): [True: 0, False: 0]
  Branch (385:16): [True: 0, False: 0]
  Branch (385:16): [True: 1, False: 1.03k]
  Branch (385:16): [True: 0, False: 0]
  Branch (385:16): [True: 0, False: 0]
  Branch (385:16): [True: 0, False: 0]
  Branch (385:16): [Folded - Ignored]
  Branch (385:16): [True: 1, False: 0]
  Branch (385:16): [True: 0, False: 0]
  Branch (385:16): [True: 0, False: 0]
  Branch (385:16): [True: 1, False: 0]
  Branch (385:16): [True: 0, False: 0]
  Branch (385:16): [True: 0, False: 0]
  Branch (385:16): [True: 0, False: 0]
386
909
                break; // We are done.
387
3.21k
            }
388
        }
389
1.72k
        Ok(output.freeze())
390
34.2k
    }
391
}
392
393
impl Stream for DropCloserReadHalf {
394
    type Item = Result<Bytes, std::io::Error>;
395
396
    // TODO(blaise.bruer) This is not very efficient as we are creating a new future on every
397
    // poll() call. It might be better to use a waker.
398
112
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
399
112
        Box::pin(self.recv())
400
112
            .as_mut()
401
112
            .poll(cx)
402
112
            .map(|result| 
match result80
{
403
79
                Ok(bytes) => {
404
79
                    if bytes.is_empty() {
  Branch (404:24): [True: 13, False: 66]
  Branch (404:24): [Folded - Ignored]
405
13
                        return None;
406
66
                    }
407
66
                    Some(Ok(bytes))
408
                }
409
1
                Err(e) => Some(Err(e.to_std_err())),
410
112
            
}80
)
411
112
    }
412
}