/build/source/nativelink-service/src/bep_server.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::borrow::Cow; |
16 | | use std::pin::Pin; |
17 | | |
18 | | use bytes::BytesMut; |
19 | | use futures::stream::unfold; |
20 | | use futures::Stream; |
21 | | use nativelink_error::{Error, ResultExt}; |
22 | | use nativelink_proto::google::devtools::build::v1::publish_build_event_server::{ |
23 | | PublishBuildEvent, PublishBuildEventServer, |
24 | | }; |
25 | | use nativelink_proto::google::devtools::build::v1::{ |
26 | | PublishBuildToolEventStreamRequest, PublishBuildToolEventStreamResponse, |
27 | | PublishLifecycleEventRequest, |
28 | | }; |
29 | | use nativelink_store::store_manager::StoreManager; |
30 | | use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike}; |
31 | | use prost::Message; |
32 | | use tonic::{Request, Response, Result, Status, Streaming}; |
33 | | use tracing::{instrument, Level}; |
34 | | |
35 | | pub struct BepServer { |
36 | | store: Store, |
37 | | } |
38 | | |
39 | | impl BepServer { |
40 | 2 | pub fn new( |
41 | 2 | config: &nativelink_config::cas_server::BepConfig, |
42 | 2 | store_manager: &StoreManager, |
43 | 2 | ) -> Result<Self, Error> { |
44 | 2 | let store = store_manager |
45 | 2 | .get_store(&config.store) |
46 | 2 | .err_tip(|| format!("Expected store {} to exist in store manager", &config.store)0 )?0 ; |
47 | | |
48 | 2 | Ok(Self { store }) |
49 | 2 | } |
50 | | |
51 | 0 | pub fn into_service(self) -> PublishBuildEventServer<BepServer> { |
52 | 0 | PublishBuildEventServer::new(self) |
53 | 0 | } |
54 | | |
55 | 1 | async fn inner_publish_lifecycle_event( |
56 | 1 | &self, |
57 | 1 | request: PublishLifecycleEventRequest, |
58 | 1 | ) -> Result<Response<()>, Error> { |
59 | 1 | let build_event = request |
60 | 1 | .build_event |
61 | 1 | .as_ref() |
62 | 1 | .err_tip(|| "Expected build_event to be set"0 )?0 ; |
63 | 1 | let stream_id = build_event |
64 | 1 | .stream_id |
65 | 1 | .as_ref() |
66 | 1 | .err_tip(|| "Expected stream_id to be set"0 )?0 ; |
67 | | |
68 | 1 | let sequence_number = build_event.sequence_number; |
69 | 1 | |
70 | 1 | let mut buf = BytesMut::new(); |
71 | 1 | request |
72 | 1 | .encode(&mut buf) |
73 | 1 | .err_tip(|| "Could not encode PublishLifecycleEventRequest proto"0 )?0 ; |
74 | | |
75 | 1 | self.store |
76 | 1 | .update_oneshot( |
77 | 1 | StoreKey::Str(Cow::Owned(format!( |
78 | 1 | "LifecycleEvent:{}:{}:{}", |
79 | 1 | &stream_id.build_id, &stream_id.invocation_id, sequence_number, |
80 | 1 | ))), |
81 | 1 | buf.freeze(), |
82 | 1 | ) |
83 | 0 | .await |
84 | 1 | .err_tip(|| "Failed to store PublishLifecycleEventRequest"0 )?0 ; |
85 | | |
86 | 1 | Ok(Response::new(())) |
87 | 1 | } |
88 | | |
89 | 1 | async fn inner_publish_build_tool_event_stream( |
90 | 1 | &self, |
91 | 1 | stream: Streaming<PublishBuildToolEventStreamRequest>, |
92 | 1 | ) -> Result<Response<PublishBuildToolEventStreamStream>, Error> { |
93 | 5 | async fn process_request( |
94 | 5 | store: Pin<&dyn StoreDriver>, |
95 | 5 | request: PublishBuildToolEventStreamRequest, |
96 | 5 | ) -> Result<PublishBuildToolEventStreamResponse, Status> { |
97 | 5 | let ordered_build_event = request |
98 | 5 | .ordered_build_event |
99 | 5 | .as_ref() |
100 | 5 | .err_tip(|| "Expected ordered_build_event to be set"0 )?0 ; |
101 | 5 | let stream_id = ordered_build_event |
102 | 5 | .stream_id |
103 | 5 | .as_ref() |
104 | 5 | .err_tip(|| "Expected stream_id to be set"0 )?0 ; |
105 | | |
106 | 5 | let sequence_number = ordered_build_event.sequence_number; |
107 | 5 | |
108 | 5 | let mut buf = BytesMut::new(); |
109 | 5 | |
110 | 5 | request |
111 | 5 | .encode(&mut buf) |
112 | 5 | .err_tip(|| "Could not encode PublishBuildToolEventStreamRequest proto"0 )?0 ; |
113 | | |
114 | 5 | store |
115 | 5 | .update_oneshot( |
116 | 5 | StoreKey::Str(Cow::Owned(format!( |
117 | 5 | "BuildToolEventStream:{}:{}:{}", |
118 | 5 | &stream_id.build_id, &stream_id.invocation_id, sequence_number, |
119 | 5 | ))), |
120 | 5 | buf.freeze(), |
121 | 5 | ) |
122 | 0 | .await |
123 | 5 | .err_tip(|| "Failed to store PublishBuildToolEventStreamRequest"0 )?0 ; |
124 | | |
125 | 5 | Ok(PublishBuildToolEventStreamResponse { |
126 | 5 | stream_id: Some(stream_id.clone()), |
127 | 5 | sequence_number, |
128 | 5 | }) |
129 | 5 | } |
130 | | |
131 | | struct State { |
132 | | store: Store, |
133 | | stream: Streaming<PublishBuildToolEventStreamRequest>, |
134 | | } |
135 | | |
136 | 1 | let response_stream = |
137 | 1 | unfold( |
138 | 1 | Some(State { |
139 | 1 | store: self.store.clone(), |
140 | 1 | stream, |
141 | 1 | }), |
142 | 5 | move |maybe_state| async move { |
143 | 5 | let mut state = maybe_state?0 ; |
144 | 5 | let request = |
145 | 5 | match state.stream.message().await0 .err_tip(|| { |
146 | 0 | "While receiving message in publish_build_tool_event_stream" |
147 | 5 | }) { |
148 | 5 | Ok(Some(request)) => request, |
149 | 0 | Ok(None) => return None, |
150 | 0 | Err(e) => return Some((Err(e.into()), None)), |
151 | | }; |
152 | 5 | process_request(state.store.as_store_driver_pin(), request) |
153 | 0 | .await |
154 | 5 | .map_or_else( |
155 | 5 | |e| Some((Err(e), None))0 , |
156 | 5 | |response| Some((Ok(response), Some(state))), |
157 | 5 | ) |
158 | 10 | }, |
159 | 1 | ); |
160 | 1 | |
161 | 1 | Ok(Response::new(Box::pin(response_stream))) |
162 | 1 | } |
163 | | } |
164 | | |
165 | | type PublishBuildToolEventStreamStream = Pin< |
166 | | Box<dyn Stream<Item = Result<PublishBuildToolEventStreamResponse, Status>> + Send + 'static>, |
167 | | >; |
168 | | |
169 | | #[tonic::async_trait] |
170 | | impl PublishBuildEvent for BepServer { |
171 | | type PublishBuildToolEventStreamStream = PublishBuildToolEventStreamStream; |
172 | | |
173 | | #[allow(clippy::blocks_in_conditions)] |
174 | 1 | #[instrument( |
175 | | err, |
176 | | ret(level = Level::INFO), |
177 | | level = Level::ERROR, |
178 | | skip_all, |
179 | | fields(request = ?grpc_request.get_ref()) |
180 | 2 | )] |
181 | | async fn publish_lifecycle_event( |
182 | | &self, |
183 | | grpc_request: Request<PublishLifecycleEventRequest>, |
184 | 1 | ) -> Result<Response<()>, Status> { |
185 | 1 | self.inner_publish_lifecycle_event(grpc_request.into_inner()) |
186 | 0 | .await |
187 | 1 | .map_err(Error::into) |
188 | 2 | } |
189 | | |
190 | | #[allow(clippy::blocks_in_conditions)] |
191 | 1 | #[instrument( |
192 | | err, |
193 | | level = Level::ERROR, |
194 | | skip_all, |
195 | | fields(request = ?grpc_request.get_ref()) |
196 | 2 | )] |
197 | | async fn publish_build_tool_event_stream( |
198 | | &self, |
199 | | grpc_request: Request<Streaming<PublishBuildToolEventStreamRequest>>, |
200 | 1 | ) -> Result<Response<Self::PublishBuildToolEventStreamStream>, Status> { |
201 | 1 | self.inner_publish_build_tool_event_stream(grpc_request.into_inner()) |
202 | 0 | .await |
203 | 1 | .map_err(Error::into) |
204 | 2 | } |
205 | | } |