/build/source/nativelink-util/src/buf_channel.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::collections::VecDeque; |
16 | | use std::pin::Pin; |
17 | | use std::sync::atomic::{AtomicBool, Ordering}; |
18 | | use std::sync::Arc; |
19 | | use std::task::Poll; |
20 | | |
21 | | use bytes::{Bytes, BytesMut}; |
22 | | use futures::task::Context; |
23 | | use futures::{Future, Stream, TryFutureExt}; |
24 | | use nativelink_error::{error_if, make_err, make_input_err, Code, Error, ResultExt}; |
25 | | use tokio::sync::mpsc; |
26 | | use tracing::{event, Level}; |
27 | | |
28 | | const ZERO_DATA: Bytes = Bytes::new(); |
29 | | |
30 | | /// Create a channel pair that can be used to transport buffer objects around to |
31 | | /// different components. This wrapper is used because the streams give some |
32 | | /// utility like managing EOF in a more friendly way, ensure if no EOF is received |
33 | | /// it will send an error to the receiver channel before shutting down and count |
34 | | /// the number of bytes sent. |
35 | | #[must_use] |
36 | 13.5k | pub fn make_buf_channel_pair() -> (DropCloserWriteHalf, DropCloserReadHalf) { |
37 | 13.5k | // We allow up to 2 items in the buffer at any given time. There is no major |
38 | 13.5k | // reason behind this magic number other than thinking it will be nice to give |
39 | 13.5k | // a little time for another thread to wake up and consume data if another |
40 | 13.5k | // thread is pumping large amounts of data into the channel. |
41 | 13.5k | let (tx, rx) = mpsc::channel(2); |
42 | 13.5k | let eof_sent = Arc::new(AtomicBool::new(false)); |
43 | 13.5k | ( |
44 | 13.5k | DropCloserWriteHalf { |
45 | 13.5k | tx: Some(tx), |
46 | 13.5k | bytes_written: 0, |
47 | 13.5k | eof_sent: eof_sent.clone(), |
48 | 13.5k | }, |
49 | 13.5k | DropCloserReadHalf { |
50 | 13.5k | rx, |
51 | 13.5k | queued_data: VecDeque::new(), |
52 | 13.5k | last_err: None, |
53 | 13.5k | eof_sent, |
54 | 13.5k | bytes_received: 0, |
55 | 13.5k | recent_data: Vec::new(), |
56 | 13.5k | max_recent_data_size: 0, |
57 | 13.5k | }, |
58 | 13.5k | ) |
59 | 13.5k | } |
60 | | |
61 | | /// Writer half of the pair. |
62 | | pub struct DropCloserWriteHalf { |
63 | | tx: Option<mpsc::Sender<Bytes>>, |
64 | | bytes_written: u64, |
65 | | eof_sent: Arc<AtomicBool>, |
66 | | } |
67 | | |
68 | | impl DropCloserWriteHalf { |
69 | | /// Sends data over the channel to the receiver. |
70 | 17.6k | pub fn send(&mut self, buf: Bytes) -> impl Future<Output = Result<(), Error>> + '_ { |
71 | 17.6k | self.send_get_bytes_on_error(buf).map_err(|err| err.00 ) |
72 | 17.6k | } |
73 | | |
74 | | /// Sends data over the channel to the receiver. |
75 | | #[inline] |
76 | 17.6k | async fn send_get_bytes_on_error(&mut self, buf: Bytes) -> Result<(), (Error, Bytes)> { |
77 | 17.6k | let tx = match self |
78 | 17.6k | .tx |
79 | 17.6k | .as_ref() |
80 | 17.6k | .ok_or_else(|| make_err!(Code::Internal, "Tried to send while stream is closed")0 ) |
81 | | { |
82 | 17.6k | Ok(tx) => tx, |
83 | 0 | Err(e) => return Err((e, buf)), |
84 | | }; |
85 | 17.6k | let Ok(buf_len) = u64::try_from(buf.len()) else { Branch (85:13): [True: 13, False: 0]
Branch (85:13): [True: 8.74k, False: 0]
Branch (85:13): [Folded - Ignored]
Branch (85:13): [True: 64, False: 0]
Branch (85:13): [True: 2.02k, False: 0]
Branch (85:13): [True: 1, False: 0]
Branch (85:13): [True: 24, False: 0]
Branch (85:13): [True: 5, False: 0]
Branch (85:13): [True: 111, False: 0]
Branch (85:13): [True: 4, False: 0]
Branch (85:13): [True: 6, False: 0]
Branch (85:13): [True: 1.07k, False: 0]
Branch (85:13): [True: 0, False: 0]
Branch (85:13): [True: 0, False: 0]
Branch (85:13): [True: 406, False: 0]
Branch (85:13): [Folded - Ignored]
Branch (85:13): [True: 5, False: 0]
Branch (85:13): [True: 3, False: 0]
Branch (85:13): [True: 58, False: 0]
Branch (85:13): [True: 104, False: 0]
Branch (85:13): [True: 5.01k, False: 0]
Branch (85:13): [True: 6, False: 0]
Branch (85:13): [True: 12, False: 0]
|
86 | 0 | return Err(( |
87 | 0 | make_err!(Code::Internal, "Could not convert usize to u64"), |
88 | 0 | buf, |
89 | 0 | )); |
90 | | }; |
91 | 17.6k | if buf_len == 0 { Branch (91:12): [True: 0, False: 13]
Branch (91:12): [True: 0, False: 8.74k]
Branch (91:12): [Folded - Ignored]
Branch (91:12): [True: 0, False: 64]
Branch (91:12): [True: 0, False: 2.02k]
Branch (91:12): [True: 0, False: 1]
Branch (91:12): [True: 0, False: 24]
Branch (91:12): [True: 0, False: 5]
Branch (91:12): [True: 0, False: 111]
Branch (91:12): [True: 0, False: 4]
Branch (91:12): [True: 0, False: 6]
Branch (91:12): [True: 0, False: 1.07k]
Branch (91:12): [True: 0, False: 0]
Branch (91:12): [True: 0, False: 0]
Branch (91:12): [True: 0, False: 406]
Branch (91:12): [Folded - Ignored]
Branch (91:12): [True: 0, False: 5]
Branch (91:12): [True: 0, False: 3]
Branch (91:12): [True: 0, False: 58]
Branch (91:12): [True: 0, False: 104]
Branch (91:12): [True: 0, False: 5.01k]
Branch (91:12): [True: 0, False: 6]
Branch (91:12): [True: 0, False: 12]
|
92 | 0 | return Err(( |
93 | 0 | make_input_err!("Cannot send EOF in send(). Instead use send_eof()"), |
94 | 0 | buf, |
95 | 0 | )); |
96 | 17.6k | } |
97 | 17.6k | if let Err(err0 ) = tx.send(buf).await { Branch (97:16): [True: 0, False: 13]
Branch (97:16): [True: 0, False: 8.74k]
Branch (97:16): [Folded - Ignored]
Branch (97:16): [True: 0, False: 64]
Branch (97:16): [True: 0, False: 2.02k]
Branch (97:16): [True: 0, False: 1]
Branch (97:16): [True: 0, False: 24]
Branch (97:16): [True: 0, False: 5]
Branch (97:16): [True: 0, False: 111]
Branch (97:16): [True: 0, False: 4]
Branch (97:16): [True: 0, False: 6]
Branch (97:16): [True: 0, False: 1.07k]
Branch (97:16): [True: 0, False: 0]
Branch (97:16): [True: 0, False: 0]
Branch (97:16): [True: 0, False: 406]
Branch (97:16): [Folded - Ignored]
Branch (97:16): [True: 0, False: 5]
Branch (97:16): [True: 0, False: 3]
Branch (97:16): [True: 0, False: 58]
Branch (97:16): [True: 0, False: 104]
Branch (97:16): [True: 0, False: 5.01k]
Branch (97:16): [True: 0, False: 6]
Branch (97:16): [True: 0, False: 12]
|
98 | | // Close our channel. |
99 | 0 | self.tx = None; |
100 | 0 | return Err(( |
101 | 0 | make_err!( |
102 | 0 | Code::Internal, |
103 | 0 | "Failed to write to data, receiver disconnected" |
104 | 0 | ), |
105 | 0 | err.0, |
106 | 0 | )); |
107 | 17.6k | } |
108 | 17.6k | self.bytes_written += buf_len; |
109 | 17.6k | Ok(()) |
110 | 17.6k | } |
111 | | |
112 | | /// Binds a reader and a writer together. This will send all the data from the reader |
113 | | /// to the writer until an EOF is received. |
114 | | /// This will always read one message ahead to ensure that if an error happens |
115 | | /// on the EOF message it will not forward on the last payload message and instead |
116 | | /// forward on the error. |
117 | 2 | pub async fn bind_buffered(&mut self, reader: &mut DropCloserReadHalf) -> Result<(), Error> { |
118 | | loop { |
119 | 52 | let chunk = reader |
120 | 52 | .recv() |
121 | 52 | .await |
122 | 52 | .err_tip(|| "In DropCloserWriteHalf::bind_buffered::recv"0 )?0 ; |
123 | 52 | if chunk.is_empty() { Branch (123:16): [True: 0, False: 0]
Branch (123:16): [Folded - Ignored]
Branch (123:16): [True: 0, False: 1]
Branch (123:16): [Folded - Ignored]
Branch (123:16): [True: 1, False: 50]
|
124 | 1 | self.send_eof() |
125 | 1 | .err_tip(|| "In DropCloserWriteHalf::bind_buffered::send_eof"0 )?0 ; |
126 | 1 | break; // EOF. |
127 | 51 | } |
128 | 51 | // Always read one message ahead so if we get an error on our EOF |
129 | 51 | // we forward it on to the reader. |
130 | 51 | if reader.peek().await.is_err() { Branch (130:16): [True: 0, False: 0]
Branch (130:16): [Folded - Ignored]
Branch (130:16): [True: 1, False: 0]
Branch (130:16): [Folded - Ignored]
Branch (130:16): [True: 0, False: 50]
|
131 | | // Read our next message for good book keeping. |
132 | 1 | let _ = reader |
133 | 1 | .recv() |
134 | 1 | .await |
135 | 1 | .err_tip(|| "In DropCloserWriteHalf::bind_buffered::peek::eof")?; |
136 | 0 | return Err(make_err!( |
137 | 0 | Code::Internal, |
138 | 0 | "DropCloserReadHalf::peek() said error, but when data received said Ok. This should never happen." |
139 | 0 | )); |
140 | 50 | } |
141 | 50 | match self.send_get_bytes_on_error(chunk).await { |
142 | 50 | Ok(()) => {} |
143 | 0 | Err(e) => { |
144 | 0 | reader.queued_data.push_front(e.1); |
145 | 0 | return Err(e.0).err_tip(|| "In DropCloserWriteHalf::bind_buffered::send"); |
146 | | } |
147 | | } |
148 | | } |
149 | 1 | Ok(()) |
150 | 2 | } |
151 | | |
152 | | /// Sends an EOF (End of File) message to the receiver which will gracefully let the |
153 | | /// stream know it has no more data. This will close the stream. |
154 | 13.5k | pub fn send_eof(&mut self) -> Result<(), Error> { |
155 | 13.5k | // Flag that we have sent the EOF. |
156 | 13.5k | let eof_was_sent = self.eof_sent.swap(true, Ordering::Release); |
157 | 13.5k | if eof_was_sent { Branch (157:12): [True: 1, False: 13.5k]
Branch (157:12): [Folded - Ignored]
|
158 | 1 | event!( |
159 | 1 | Level::WARN, |
160 | 1 | "Stream already closed when eof already was sent. This is often ok for retry was triggered, but should not happen on happy path." |
161 | | ); |
162 | 1 | return Ok(()); |
163 | 13.5k | } |
164 | 13.5k | |
165 | 13.5k | // Now close our stream. |
166 | 13.5k | self.tx = None; |
167 | 13.5k | Ok(()) |
168 | 13.5k | } |
169 | | |
170 | | /// Returns the number of bytes written so far. This does not mean the receiver received |
171 | | /// all of the bytes written to the stream so far. |
172 | | #[must_use] |
173 | 115 | pub const fn get_bytes_written(&self) -> u64 { |
174 | 115 | self.bytes_written |
175 | 115 | } |
176 | | |
177 | | /// Returns if the pipe was broken. This is good for determining if the reader broke the |
178 | | /// pipe or the writer broke the pipe, since this will only return true if the pipe was |
179 | | /// broken by the writer. |
180 | | #[must_use] |
181 | 2 | pub const fn is_pipe_broken(&self) -> bool { |
182 | 2 | self.tx.is_none() |
183 | 2 | } |
184 | | } |
185 | | |
186 | | /// Reader half of the pair. |
187 | | pub struct DropCloserReadHalf { |
188 | | rx: mpsc::Receiver<Bytes>, |
189 | | /// Number of bytes received over the stream. |
190 | | bytes_received: u64, |
191 | | eof_sent: Arc<AtomicBool>, |
192 | | /// If there was an error in the stream, this will be set to the last error. |
193 | | last_err: Option<Error>, |
194 | | /// If not empty, this is the data that needs to be sent out before |
195 | | /// data from the underlying channel can should be sent. |
196 | | queued_data: VecDeque<Bytes>, |
197 | | /// As data is being read from the stream, this buffer will be filled |
198 | | /// with the most recent data. Once `max_recent_data_size` is reached |
199 | | /// this buffer will be cleared and no longer be populated. |
200 | | /// This is useful if the caller wants to reset the the reader to before |
201 | | /// any of the data was received if possible (eg: something failed and |
202 | | /// we want to retry). |
203 | | recent_data: Vec<Bytes>, |
204 | | /// Amount of data to keep in the `recent_data` buffer before clearing it |
205 | | /// and no longer populating it. |
206 | | max_recent_data_size: u64, |
207 | | } |
208 | | |
209 | | impl DropCloserReadHalf { |
210 | | /// Returns if the stream has data ready. |
211 | 0 | pub fn is_empty(&self) -> bool { |
212 | 0 | self.rx.is_empty() |
213 | 0 | } |
214 | | |
215 | 30.7k | fn recv_inner(&mut self, chunk: Bytes) -> Result<Bytes, Error> { |
216 | 30.7k | // `queued_data` is allowed to have empty bytes that represent EOF |
217 | 30.7k | if chunk.is_empty() { Branch (217:12): [True: 13.0k, False: 17.6k]
Branch (217:12): [Folded - Ignored]
|
218 | 13.0k | if !self.eof_sent.load(Ordering::Acquire) { Branch (218:16): [True: 19, False: 13.0k]
Branch (218:16): [Folded - Ignored]
|
219 | 19 | let err = make_err!(Code::Internal, "Sender dropped before sending EOF"); |
220 | 19 | self.queued_data.clear(); |
221 | 19 | self.recent_data.clear(); |
222 | 19 | self.bytes_received = 0; |
223 | 19 | self.last_err = Some(err.clone()); |
224 | 19 | return Err(err); |
225 | 13.0k | }; |
226 | 13.0k | |
227 | 13.0k | self.maybe_populate_recent_data(&ZERO_DATA); |
228 | 13.0k | return Ok(ZERO_DATA); |
229 | 17.6k | }; |
230 | 17.6k | |
231 | 17.6k | self.bytes_received += chunk.len() as u64; |
232 | 17.6k | self.maybe_populate_recent_data(&chunk); |
233 | 17.6k | Ok(chunk) |
234 | 30.7k | } |
235 | | |
236 | | /// Try to receive a chunk of data, returning `None` if none is available. |
237 | 54.0k | pub fn try_recv(&mut self) -> Option<Result<Bytes, Error>> { |
238 | 54.0k | if let Some(err2 ) = &self.last_err { Branch (238:16): [True: 2, False: 54.0k]
Branch (238:16): [Folded - Ignored]
|
239 | 2 | return Some(Err(err.clone())); |
240 | 54.0k | } |
241 | 54.0k | self.queued_data.pop_front().map(Ok) |
242 | 54.0k | } |
243 | | |
244 | | /// Receive a chunk of data, waiting asynchronously until some is available. |
245 | 54.0k | pub async fn recv(&mut self) -> Result<Bytes, Error> { |
246 | 54.0k | if let Some(result23.2k ) = self.try_recv() { Branch (246:16): [True: 9.76k, False: 5]
Branch (246:16): [True: 10.0k, False: 14.2k]
Branch (246:16): [True: 0, False: 112]
Branch (246:16): [True: 0, False: 2]
Branch (246:16): [True: 0, False: 6]
Branch (246:16): [True: 2.10k, False: 3.20k]
Branch (246:16): [True: 0, False: 0]
Branch (246:16): [True: 0, False: 35]
Branch (246:16): [True: 571, False: 2.79k]
Branch (246:16): [True: 705, False: 9.02k]
Branch (246:16): [True: 0, False: 2]
Branch (246:16): [True: 0, False: 24]
Branch (246:16): [True: 5, False: 1.10k]
Branch (246:16): [True: 0, False: 0]
Branch (246:16): [True: 0, False: 32]
Branch (246:16): [True: 0, False: 7]
Branch (246:16): [Folded - Ignored]
Branch (246:16): [True: 1, False: 9]
Branch (246:16): [True: 0, False: 4]
Branch (246:16): [True: 0, False: 130]
Branch (246:16): [True: 54, False: 60]
Branch (246:16): [True: 0, False: 10]
Branch (246:16): [True: 0, False: 8]
Branch (246:16): [True: 0, False: 0]
|
247 | 23.2k | result |
248 | | } else { |
249 | | // `None` here indicates EOF, which we represent as Zero data |
250 | 30.7k | let data30.7k = self.rx.recv().await.unwrap_or(ZERO_DATA); |
251 | 30.7k | self.recv_inner(data) |
252 | | } |
253 | 54.0k | } |
254 | | |
255 | 30.7k | fn maybe_populate_recent_data(&mut self, chunk: &Bytes) { |
256 | 30.7k | if self.max_recent_data_size == 0 { Branch (256:12): [True: 30.6k, False: 52]
Branch (256:12): [Folded - Ignored]
|
257 | 30.6k | return; // Fast path. |
258 | 52 | } |
259 | 52 | if self.bytes_received > self.max_recent_data_size { Branch (259:12): [True: 0, False: 52]
Branch (259:12): [Folded - Ignored]
|
260 | 0 | if !self.recent_data.is_empty() { Branch (260:16): [True: 0, False: 0]
Branch (260:16): [Folded - Ignored]
|
261 | 0 | self.recent_data.clear(); |
262 | 0 | } |
263 | 0 | return; |
264 | 52 | } |
265 | 52 | self.recent_data.push(chunk.clone()); |
266 | 30.7k | } |
267 | | |
268 | | /// Sets the maximum size of the `recent_data` buffer. If the number of bytes |
269 | | /// received exceeds this size, the `recent_data` buffer will be cleared and |
270 | | /// no longer populated. |
271 | 2 | pub fn set_max_recent_data_size(&mut self, size: u64) { |
272 | 2 | self.max_recent_data_size = size; |
273 | 2 | } |
274 | | |
275 | | /// Attempts to reset the stream to before any data was received. This will |
276 | | /// only work if the number of bytes received is less than `max_recent_data_size`. |
277 | | /// |
278 | | /// On error the state of the stream is undefined and the caller should not |
279 | | /// attempt to use the stream again. |
280 | 1 | pub fn try_reset_stream(&mut self) -> Result<(), Error> { |
281 | 1 | if self.bytes_received > self.max_recent_data_size { Branch (281:12): [True: 0, False: 1]
Branch (281:12): [Folded - Ignored]
|
282 | 0 | return Err(make_err!( |
283 | 0 | Code::Internal, |
284 | 0 | "Cannot reset stream, max_recent_data_size exceeded" |
285 | 0 | )); |
286 | 1 | } |
287 | 1 | let mut data_sum = 0; |
288 | 1 | for chunk0 in self.recent_data.drain(..).rev() { |
289 | 0 | data_sum += chunk.len() as u64; |
290 | 0 | self.queued_data.push_front(chunk); |
291 | 0 | } |
292 | 1 | assert!(self.recent_data.is_empty(), "Recent_data should be empty"0 ); |
293 | | // Ensure the sum of the bytes in recent_data is equal to the bytes_received. |
294 | 0 | error_if!( |
295 | 1 | data_sum != self.bytes_received, Branch (295:13): [True: 0, False: 1]
Branch (295:13): [Folded - Ignored]
|
296 | | "Sum of recent_data bytes does not equal bytes_received" |
297 | | ); |
298 | 1 | self.bytes_received = 0; |
299 | 1 | Ok(()) |
300 | 1 | } |
301 | | |
302 | | /// Drains the reader until an EOF is received, but sends data to the void. |
303 | 1.00k | pub async fn drain(&mut self) -> Result<(), Error> { |
304 | | loop { |
305 | 1.00k | if self Branch (305:16): [True: 2, False: 0]
Branch (305:16): [Folded - Ignored]
Branch (305:16): [True: 1.00k, False: 0]
Branch (305:16): [True: 0, False: 0]
Branch (305:16): [True: 1, False: 2]
Branch (305:16): [Folded - Ignored]
|
306 | 1.00k | .recv() |
307 | 1.00k | .await |
308 | 1.00k | .err_tip(|| "Failed to drain in buf_channel::drain"0 )?0 |
309 | 1.00k | .is_empty() |
310 | | { |
311 | 1.00k | break; // EOF. |
312 | 2 | } |
313 | | } |
314 | 1.00k | Ok(()) |
315 | 1.00k | } |
316 | | |
317 | | /// Peek the next set of bytes in the stream without consuming them. |
318 | 13.4k | pub async fn peek(&mut self) -> Result<&Bytes, Error> { |
319 | 13.4k | if self.queued_data.is_empty() { Branch (319:12): [True: 2, False: 0]
Branch (319:12): [True: 6.93k, False: 0]
Branch (319:12): [Folded - Ignored]
Branch (319:12): [True: 1, False: 0]
Branch (319:12): [True: 1.36k, False: 0]
Branch (319:12): [True: 0, False: 0]
Branch (319:12): [True: 17, False: 0]
Branch (319:12): [True: 1.02k, False: 0]
Branch (319:12): [True: 3.99k, False: 0]
Branch (319:12): [True: 1, False: 0]
Branch (319:12): [True: 10, False: 0]
Branch (319:12): [True: 8, False: 0]
Branch (319:12): [True: 0, False: 0]
Branch (319:12): [True: 8, False: 0]
Branch (319:12): [True: 2, False: 0]
Branch (319:12): [Folded - Ignored]
Branch (319:12): [True: 4, False: 0]
Branch (319:12): [True: 2, False: 0]
Branch (319:12): [True: 25, False: 0]
Branch (319:12): [True: 53, False: 0]
Branch (319:12): [True: 5, False: 0]
Branch (319:12): [True: 4, False: 0]
Branch (319:12): [True: 0, False: 0]
|
320 | 13.4k | let chunk13.4k = self.recv().await.err_tip(|| "In buf_channel::peek"5 )?5 ; |
321 | 13.4k | self.queued_data.push_front(chunk); |
322 | 0 | } |
323 | 13.4k | Ok(self |
324 | 13.4k | .queued_data |
325 | 13.4k | .front() |
326 | 13.4k | .expect("Should have data in the queue")) |
327 | 13.4k | } |
328 | | |
329 | | /// The number of bytes received over this stream so far. |
330 | 0 | pub fn get_bytes_received(&self) -> u64 { |
331 | 0 | self.bytes_received |
332 | 0 | } |
333 | | |
334 | | /// Takes exactly `size` number of bytes from the stream and returns them. |
335 | | /// This means the stream will keep polling until either an EOF is received or |
336 | | /// `size` bytes are received and concat them all together then return them. |
337 | | /// This method is optimized to reduce copies when possible. |
338 | | /// If `size` is None, it will take all the bytes in the stream. |
339 | 34.2k | pub async fn consume(&mut self, size: Option<usize>) -> Result<Bytes, Error> { |
340 | 34.2k | let size = size.unwrap_or(usize::MAX); |
341 | 1.73k | let first_chunk = { |
342 | 34.2k | let mut chunk34.2k = self |
343 | 34.2k | .recv() |
344 | 34.2k | .await |
345 | 34.2k | .err_tip(|| "During first read of buf_channel::take()"9 )?9 ; |
346 | 34.2k | if chunk.is_empty() { Branch (346:16): [True: 2, False: 9.76k]
Branch (346:16): [True: 62, False: 16.9k]
Branch (346:16): [Folded - Ignored]
Branch (346:16): [True: 0, False: 1]
Branch (346:16): [True: 0, False: 2.00k]
Branch (346:16): [True: 0, False: 0]
Branch (346:16): [True: 0, False: 17]
Branch (346:16): [True: 207, False: 1.02k]
Branch (346:16): [True: 60, False: 3.99k]
Branch (346:16): [True: 0, False: 1]
Branch (346:16): [True: 0, False: 10]
Branch (346:16): [True: 1, False: 7]
Branch (346:16): [True: 0, False: 0]
Branch (346:16): [True: 8, False: 8]
Branch (346:16): [True: 2, False: 2]
Branch (346:16): [Folded - Ignored]
Branch (346:16): [True: 0, False: 4]
Branch (346:16): [True: 0, False: 2]
Branch (346:16): [True: 0, False: 25]
Branch (346:16): [True: 4, False: 5]
Branch (346:16): [True: 0, False: 5]
Branch (346:16): [True: 0, False: 4]
Branch (346:16): [True: 0, False: 0]
|
347 | 346 | return Ok(chunk); // EOF. |
348 | 33.8k | } |
349 | 33.8k | if chunk.len() > size { Branch (349:16): [True: 9.76k, False: 2]
Branch (349:16): [True: 10.0k, False: 6.93k]
Branch (349:16): [Folded - Ignored]
Branch (349:16): [True: 0, False: 1]
Branch (349:16): [True: 645, False: 1.36k]
Branch (349:16): [True: 0, False: 0]
Branch (349:16): [True: 0, False: 17]
Branch (349:16): [True: 0, False: 1.02k]
Branch (349:16): [True: 0, False: 3.99k]
Branch (349:16): [True: 0, False: 1]
Branch (349:16): [True: 0, False: 10]
Branch (349:16): [True: 0, False: 7]
Branch (349:16): [True: 0, False: 0]
Branch (349:16): [True: 0, False: 8]
Branch (349:16): [True: 0, False: 2]
Branch (349:16): [Folded - Ignored]
Branch (349:16): [True: 0, False: 4]
Branch (349:16): [True: 0, False: 2]
Branch (349:16): [True: 0, False: 25]
Branch (349:16): [True: 2, False: 3]
Branch (349:16): [True: 0, False: 5]
Branch (349:16): [True: 0, False: 4]
Branch (349:16): [True: 0, False: 0]
|
350 | 20.4k | let remaining = chunk.split_off(size); |
351 | 20.4k | self.queued_data.push_front(remaining); |
352 | 20.4k | // No need to read EOF if we are a partial chunk. |
353 | 20.4k | return Ok(chunk); |
354 | 13.4k | } |
355 | 13.4k | // Try to read our EOF to ensure our sender did not error out. |
356 | 13.4k | match self.peek().await { |
357 | 13.3k | Ok(peeked_chunk) => { |
358 | 13.3k | if peeked_chunk.is_empty() || chunk.len() == size1.91k { Branch (358:24): [True: 2, False: 0]
Branch (358:51): [True: 0, False: 0]
Branch (358:24): [True: 6.91k, False: 13]
Branch (358:51): [True: 0, False: 13]
Branch (358:24): [Folded - Ignored]
Branch (358:51): [Folded - Ignored]
Branch (358:24): [True: 1, False: 0]
Branch (358:51): [True: 0, False: 0]
Branch (358:24): [True: 748, False: 616]
Branch (358:51): [True: 180, False: 436]
Branch (358:24): [True: 0, False: 0]
Branch (358:51): [True: 0, False: 0]
Branch (358:24): [True: 17, False: 0]
Branch (358:51): [True: 0, False: 0]
Branch (358:24): [True: 454, False: 571]
Branch (358:51): [True: 0, False: 571]
Branch (358:24): [True: 3.28k, False: 705]
Branch (358:51): [True: 0, False: 705]
Branch (358:24): [True: 1, False: 0]
Branch (358:51): [True: 0, False: 0]
Branch (358:24): [True: 10, False: 0]
Branch (358:51): [True: 0, False: 0]
Branch (358:24): [True: 3, False: 4]
Branch (358:51): [True: 1, False: 3]
Branch (358:24): [True: 0, False: 0]
Branch (358:51): [True: 0, False: 0]
Branch (358:24): [True: 8, False: 0]
Branch (358:51): [True: 0, False: 0]
Branch (358:24): [True: 2, False: 0]
Branch (358:51): [True: 0, False: 0]
Branch (358:24): [Folded - Ignored]
Branch (358:51): [Folded - Ignored]
Branch (358:24): [True: 2, False: 2]
Branch (358:51): [True: 1, False: 1]
Branch (358:24): [True: 2, False: 0]
Branch (358:51): [True: 0, False: 0]
Branch (358:24): [True: 25, False: 0]
Branch (358:51): [True: 0, False: 0]
Branch (358:24): [True: 2, False: 1]
Branch (358:51): [True: 0, False: 1]
Branch (358:24): [True: 5, False: 0]
Branch (358:51): [True: 0, False: 0]
Branch (358:24): [True: 4, False: 0]
Branch (358:51): [True: 0, False: 0]
Branch (358:24): [True: 0, False: 0]
Branch (358:51): [True: 0, False: 0]
|
359 | 11.6k | return Ok(chunk); |
360 | 1.73k | } |
361 | | } |
362 | 4 | Err(e) => { |
363 | 4 | return Err(e.clone()).err_tip(|| "Failed to check if next chunk is EOF")? |
364 | | } |
365 | | } |
366 | 1.73k | chunk |
367 | 1.73k | }; |
368 | 1.73k | let mut output = BytesMut::new(); |
369 | 1.73k | output.extend_from_slice(&first_chunk); |
370 | | |
371 | | loop { |
372 | 4.94k | let mut chunk4.94k = self |
373 | 4.94k | .recv() |
374 | 4.94k | .await |
375 | 4.94k | .err_tip(|| "During first read of buf_channel::take()"1 )?1 ; |
376 | 4.94k | if chunk.is_empty() { Branch (376:16): [True: 0, False: 0]
Branch (376:16): [True: 13, False: 168]
Branch (376:16): [Folded - Ignored]
Branch (376:16): [True: 0, False: 0]
Branch (376:16): [True: 180, False: 746]
Branch (376:16): [True: 0, False: 0]
Branch (376:16): [True: 0, False: 0]
Branch (376:16): [True: 271, False: 834]
Branch (376:16): [True: 354, False: 1.33k]
Branch (376:16): [True: 0, False: 0]
Branch (376:16): [True: 0, False: 0]
Branch (376:16): [True: 2, False: 1.04k]
Branch (376:16): [True: 0, False: 0]
Branch (376:16): [True: 0, False: 0]
Branch (376:16): [True: 0, False: 0]
Branch (376:16): [Folded - Ignored]
Branch (376:16): [True: 0, False: 1]
Branch (376:16): [True: 0, False: 0]
Branch (376:16): [True: 0, False: 0]
Branch (376:16): [True: 0, False: 1]
Branch (376:16): [True: 0, False: 0]
Branch (376:16): [True: 0, False: 0]
Branch (376:16): [True: 0, False: 0]
|
377 | 820 | break; // EOF. |
378 | 4.12k | } |
379 | 4.12k | if output.len() + chunk.len() > size { Branch (379:16): [True: 0, False: 0]
Branch (379:16): [True: 0, False: 168]
Branch (379:16): [Folded - Ignored]
Branch (379:16): [True: 0, False: 0]
Branch (379:16): [True: 99, False: 647]
Branch (379:16): [True: 0, False: 0]
Branch (379:16): [True: 0, False: 0]
Branch (379:16): [True: 0, False: 834]
Branch (379:16): [True: 0, False: 1.33k]
Branch (379:16): [True: 0, False: 0]
Branch (379:16): [True: 0, False: 0]
Branch (379:16): [True: 0, False: 1.04k]
Branch (379:16): [True: 0, False: 0]
Branch (379:16): [True: 0, False: 0]
Branch (379:16): [True: 0, False: 0]
Branch (379:16): [Folded - Ignored]
Branch (379:16): [True: 0, False: 1]
Branch (379:16): [True: 0, False: 0]
Branch (379:16): [True: 0, False: 0]
Branch (379:16): [True: 0, False: 1]
Branch (379:16): [True: 0, False: 0]
Branch (379:16): [True: 0, False: 0]
Branch (379:16): [True: 0, False: 0]
|
380 | 99 | // Slice off the extra data and put it back into the queue. We are done. |
381 | 99 | let remaining = chunk.split_off(size - output.len()); |
382 | 99 | self.queued_data.push_front(remaining); |
383 | 4.02k | } |
384 | 4.12k | output.extend_from_slice(&chunk); |
385 | 4.12k | if output.len() == size { Branch (385:16): [True: 0, False: 0]
Branch (385:16): [True: 0, False: 168]
Branch (385:16): [Folded - Ignored]
Branch (385:16): [True: 0, False: 0]
Branch (385:16): [True: 256, False: 490]
Branch (385:16): [True: 0, False: 0]
Branch (385:16): [True: 0, False: 0]
Branch (385:16): [True: 300, False: 534]
Branch (385:16): [True: 350, False: 982]
Branch (385:16): [True: 0, False: 0]
Branch (385:16): [True: 0, False: 0]
Branch (385:16): [True: 1, False: 1.03k]
Branch (385:16): [True: 0, False: 0]
Branch (385:16): [True: 0, False: 0]
Branch (385:16): [True: 0, False: 0]
Branch (385:16): [Folded - Ignored]
Branch (385:16): [True: 1, False: 0]
Branch (385:16): [True: 0, False: 0]
Branch (385:16): [True: 0, False: 0]
Branch (385:16): [True: 1, False: 0]
Branch (385:16): [True: 0, False: 0]
Branch (385:16): [True: 0, False: 0]
Branch (385:16): [True: 0, False: 0]
|
386 | 909 | break; // We are done. |
387 | 3.21k | } |
388 | | } |
389 | 1.72k | Ok(output.freeze()) |
390 | 34.2k | } |
391 | | } |
392 | | |
393 | | impl Stream for DropCloserReadHalf { |
394 | | type Item = Result<Bytes, std::io::Error>; |
395 | | |
396 | | // TODO(blaise.bruer) This is not very efficient as we are creating a new future on every |
397 | | // poll() call. It might be better to use a waker. |
398 | 112 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
399 | 112 | Box::pin(self.recv()) |
400 | 112 | .as_mut() |
401 | 112 | .poll(cx) |
402 | 112 | .map(|result| match result80 { |
403 | 79 | Ok(bytes) => { |
404 | 79 | if bytes.is_empty() { Branch (404:24): [True: 13, False: 66]
Branch (404:24): [Folded - Ignored]
|
405 | 13 | return None; |
406 | 66 | } |
407 | 66 | Some(Ok(bytes)) |
408 | | } |
409 | 1 | Err(e) => Some(Err(e.to_std_err())), |
410 | 112 | }80 ) |
411 | 112 | } |
412 | | } |