Line | Count | Source |
1 | // Copyright 2024 The NativeLink Authors. All rights reserved. | |
2 | // | |
3 | // Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | // you may not use this file except in compliance with the License. | |
5 | // You may obtain a copy of the License at | |
6 | // | |
7 | // http://www.apache.org/licenses/LICENSE-2.0 | |
8 | // | |
9 | // Unless required by applicable law or agreed to in writing, software | |
10 | // distributed under the License is distributed on an "AS IS" BASIS, | |
11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | // See the License for the specific language governing permissions and | |
13 | // limitations under the License. | |
14 | ||
15 | use std::pin::Pin; | |
16 | use std::task::{ready, Context}; | |
17 | ||
18 | use bytes::Bytes; | |
19 | use futures::task::Poll; | |
20 | use hyper::body::{Body, Frame}; | |
21 | use pin_project_lite::pin_project; | |
22 | use tokio::sync::mpsc; | |
23 | ||
24 | const DEFAULT_CHANNEL_BODY_BUFFER_SIZE: usize = 32; | |
25 | ||
26 | pin_project! { | |
27 | pub struct ChannelBody { | |
28 | #[pin] | |
29 | rx: mpsc::Receiver<Frame<Bytes>>, | |
30 | } | |
31 | } | |
32 | ||
33 | // Note: At the moment this is only used in a few tests after tonic removed its | |
34 | // concrete Body type. See `nativelink_util::buf_channel` for channel | |
35 | // implementations used in non-test code and see the tests for example usage. | |
36 | impl ChannelBody { | |
37 | #[must_use] | |
38 | 27 | pub fn new() -> (mpsc::Sender<Frame<Bytes>>, Self) { |
39 | 27 | Self::with_buffer_size(DEFAULT_CHANNEL_BODY_BUFFER_SIZE) |
40 | 27 | } |
41 | ||
42 | #[must_use] | |
43 | 29 | pub fn with_buffer_size(buffer_size: usize) -> (mpsc::Sender<Frame<Bytes>>, Self) { |
44 | 29 | let (tx, rx) = mpsc::channel(buffer_size); |
45 | 29 | (tx, Self { rx }) |
46 | 29 | } |
47 | } | |
48 | ||
49 | impl Body for ChannelBody { | |
50 | type Data = Bytes; | |
51 | type Error = tonic::Status; | |
52 | ||
53 | 188 | fn poll_frame( |
54 | 188 | self: Pin<&mut Self>, |
55 | 188 | cx: &mut Context<'_>, |
56 | 188 | ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { |
57 | 188 | let |
58 | 68 | Poll::Ready(frame.map(Ok)) |
59 | 188 | } |
60 | ||
61 | 3 | fn is_end_stream(&self) -> bool { |
62 | 3 | self.rx.is_closed() |
63 | 3 | } |
64 | } |