/build/source/nativelink-util/src/channel_body_for_tests.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::pin::Pin; |
16 | | use core::task::{Context, ready}; |
17 | | |
18 | | use bytes::Bytes; |
19 | | use futures::task::Poll; |
20 | | use hyper::body::{Body, Frame}; |
21 | | use pin_project_lite::pin_project; |
22 | | use tokio::sync::mpsc; |
23 | | |
24 | | const DEFAULT_CHANNEL_BODY_BUFFER_SIZE: usize = 32; |
25 | | |
26 | | pin_project! { |
27 | | pub struct ChannelBody { |
28 | | #[pin] |
29 | | rx: mpsc::Receiver<Frame<Bytes>>, |
30 | | } |
31 | | } |
32 | | |
33 | | // Note: At the moment this is only used in a few tests after tonic removed its |
34 | | // concrete Body type. See `nativelink_util::buf_channel` for channel |
35 | | // implementations used in non-test code and see the tests for example usage. |
36 | | impl ChannelBody { |
37 | | #[must_use] |
38 | 31 | pub fn new() -> (mpsc::Sender<Frame<Bytes>>, Self) { |
39 | 31 | Self::with_buffer_size(DEFAULT_CHANNEL_BODY_BUFFER_SIZE) |
40 | 31 | } |
41 | | |
42 | | #[must_use] |
43 | 33 | pub fn with_buffer_size(buffer_size: usize) -> (mpsc::Sender<Frame<Bytes>>, Self) { |
44 | 33 | let (tx, rx) = mpsc::channel(buffer_size); |
45 | 33 | (tx, Self { rx }) |
46 | 33 | } |
47 | | } |
48 | | |
49 | | impl Body for ChannelBody { |
50 | | type Data = Bytes; |
51 | | type Error = tonic::Status; |
52 | | |
53 | 199 | fn poll_frame( |
54 | 199 | self: Pin<&mut Self>, |
55 | 199 | cx: &mut Context<'_>, |
56 | 199 | ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { |
57 | 199 | let frame79 = ready!120 (self.project().rx.poll_recv(cx)); |
58 | 79 | Poll::Ready(frame.map(Ok)) |
59 | 199 | } |
60 | | |
61 | 26 | fn is_end_stream(&self) -> bool { |
62 | 26 | self.rx.is_closed() |
63 | 26 | } |
64 | | } |