/build/source/nativelink-service/src/bep_server.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::pin::Pin; |
16 | | use std::borrow::Cow; |
17 | | |
18 | | use bytes::BytesMut; |
19 | | use futures::Stream; |
20 | | use futures::stream::unfold; |
21 | | use nativelink_error::{Error, ResultExt}; |
22 | | use nativelink_proto::com::github::trace_machina::nativelink::events::{BepEvent, bep_event}; |
23 | | use nativelink_proto::google::devtools::build::v1::publish_build_event_server::{ |
24 | | PublishBuildEvent, PublishBuildEventServer, |
25 | | }; |
26 | | use nativelink_proto::google::devtools::build::v1::{ |
27 | | PublishBuildToolEventStreamRequest, PublishBuildToolEventStreamResponse, |
28 | | PublishLifecycleEventRequest, |
29 | | }; |
30 | | use nativelink_store::store_manager::StoreManager; |
31 | | use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike}; |
32 | | use opentelemetry::baggage::BaggageExt; |
33 | | use opentelemetry::context::Context; |
34 | | use opentelemetry_semantic_conventions::attribute::ENDUSER_ID; |
35 | | use prost::Message; |
36 | | use tonic::{Request, Response, Result, Status, Streaming}; |
37 | | use tracing::{Level, instrument}; |
38 | | |
39 | | /// Current version of the BEP event. This might be used in the future if |
40 | | /// there is a breaking change in the BEP event format. |
41 | | const BEP_EVENT_VERSION: u32 = 0; |
42 | | |
43 | | #[allow(clippy::result_large_err, reason = "TODO Fix this. Breaks on nightly")] |
44 | 3 | fn get_identity() -> Option<String> { |
45 | 3 | Context::current() |
46 | 3 | .baggage() |
47 | 3 | .get(ENDUSER_ID) |
48 | 3 | .map(|value| value.as_str()0 .to_string0 ()) |
49 | 3 | } |
50 | | |
51 | | #[derive(Debug)] |
52 | | pub struct BepServer { |
53 | | store: Store, |
54 | | } |
55 | | |
56 | | impl BepServer { |
57 | 3 | pub fn new( |
58 | 3 | config: &nativelink_config::cas_server::BepConfig, |
59 | 3 | store_manager: &StoreManager, |
60 | 3 | ) -> Result<Self, Error> { |
61 | 3 | let store = store_manager |
62 | 3 | .get_store(&config.store) |
63 | 3 | .err_tip(|| format!("Expected store {} to exist in store manager"0 , &config.store0 ))?0 ; |
64 | | |
65 | 3 | Ok(Self { store }) |
66 | 3 | } |
67 | | |
68 | 0 | pub fn into_service(self) -> PublishBuildEventServer<Self> { |
69 | 0 | PublishBuildEventServer::new(self) |
70 | 0 | } |
71 | | |
72 | 1 | async fn inner_publish_lifecycle_event( |
73 | 1 | &self, |
74 | 1 | request: PublishLifecycleEventRequest, |
75 | 1 | identity: Option<String>, |
76 | 1 | ) -> Result<Response<()>, Error> { |
77 | 1 | let build_event = request |
78 | 1 | .build_event |
79 | 1 | .as_ref() |
80 | 1 | .err_tip(|| "Expected build_event to be set")?0 ; |
81 | 1 | let stream_id = build_event |
82 | 1 | .stream_id |
83 | 1 | .as_ref() |
84 | 1 | .err_tip(|| "Expected stream_id to be set")?0 ; |
85 | | |
86 | 1 | let sequence_number = build_event.sequence_number; |
87 | | |
88 | 1 | let store_key = StoreKey::Str(Cow::Owned(format!( |
89 | 1 | "BepEvent:le:{}:{}:{}", |
90 | 1 | &stream_id.build_id, &stream_id.invocation_id, sequence_number, |
91 | 1 | ))); |
92 | | |
93 | 1 | let bep_event = BepEvent { |
94 | 1 | version: BEP_EVENT_VERSION, |
95 | 1 | identity: identity.unwrap_or_default(), |
96 | 1 | event: Some(bep_event::Event::LifecycleEvent(request)), |
97 | 1 | }; |
98 | 1 | let mut buf = BytesMut::new(); |
99 | 1 | bep_event |
100 | 1 | .encode(&mut buf) |
101 | 1 | .err_tip(|| "Could not encode PublishLifecycleEventRequest proto")?0 ; |
102 | | |
103 | 1 | self.store |
104 | 1 | .update_oneshot(store_key, buf.freeze()) |
105 | 1 | .await |
106 | 1 | .err_tip(|| "Failed to store PublishLifecycleEventRequest")?0 ; |
107 | | |
108 | 1 | Ok(Response::new(())) |
109 | 1 | } |
110 | | |
111 | 2 | async fn inner_publish_build_tool_event_stream( |
112 | 2 | &self, |
113 | 2 | stream: Streaming<PublishBuildToolEventStreamRequest>, |
114 | 2 | identity: Option<String>, |
115 | 2 | ) -> Result<Response<PublishBuildToolEventStreamStream>, Error> { |
116 | 6 | async fn process_request( |
117 | 6 | store: Pin<&dyn StoreDriver>, |
118 | 6 | request: PublishBuildToolEventStreamRequest, |
119 | 6 | identity: String, |
120 | 6 | ) -> Result<PublishBuildToolEventStreamResponse, Status> { |
121 | 6 | let ordered_build_event = request |
122 | 6 | .ordered_build_event |
123 | 6 | .as_ref() |
124 | 6 | .err_tip(|| "Expected ordered_build_event to be set")?0 ; |
125 | 6 | let stream_id = ordered_build_event |
126 | 6 | .stream_id |
127 | 6 | .as_ref() |
128 | 6 | .err_tip(|| "Expected stream_id to be set")?0 |
129 | 6 | .clone(); |
130 | | |
131 | 6 | let sequence_number = ordered_build_event.sequence_number; |
132 | | |
133 | 6 | let bep_event = BepEvent { |
134 | 6 | version: BEP_EVENT_VERSION, |
135 | 6 | identity, |
136 | 6 | event: Some(bep_event::Event::BuildToolEvent(request)), |
137 | 6 | }; |
138 | 6 | let mut buf = BytesMut::new(); |
139 | | |
140 | 6 | bep_event |
141 | 6 | .encode(&mut buf) |
142 | 6 | .err_tip(|| "Could not encode PublishBuildToolEventStreamRequest proto")?0 ; |
143 | | |
144 | 6 | store |
145 | 6 | .update_oneshot( |
146 | 6 | StoreKey::Str(Cow::Owned(format!( |
147 | 6 | "BepEvent:be:{}:{}:{}", |
148 | 6 | &stream_id.build_id, &stream_id.invocation_id, sequence_number, |
149 | 6 | ))), |
150 | 6 | buf.freeze(), |
151 | 6 | ) |
152 | 6 | .await |
153 | 6 | .err_tip(|| "Failed to store PublishBuildToolEventStreamRequest")?0 ; |
154 | | |
155 | 6 | Ok(PublishBuildToolEventStreamResponse { |
156 | 6 | stream_id: Some(stream_id.clone()), |
157 | 6 | sequence_number, |
158 | 6 | }) |
159 | 6 | } |
160 | | |
161 | | struct State { |
162 | | store: Store, |
163 | | stream: Streaming<PublishBuildToolEventStreamRequest>, |
164 | | identity: String, |
165 | | } |
166 | | |
167 | 2 | let response_stream = |
168 | 2 | unfold( |
169 | 2 | Some(State { |
170 | 2 | store: self.store.clone(), |
171 | 2 | stream, |
172 | 2 | identity: identity.unwrap_or_default(), |
173 | 2 | }), |
174 | 7 | move |maybe_state| async move { |
175 | 7 | let mut state = maybe_state?0 ; |
176 | 6 | let request = |
177 | 7 | match state.stream.message().await.err_tip( |
178 | | || "While receiving message in publish_build_tool_event_stream", |
179 | | ) { |
180 | 6 | Ok(Some(request)) => request, |
181 | 1 | Ok(None) => return None, |
182 | 0 | Err(e) => return Some((Err(e.into()), None)), |
183 | | }; |
184 | 6 | process_request( |
185 | 6 | state.store.as_store_driver_pin(), |
186 | 6 | request, |
187 | 6 | state.identity.clone(), |
188 | 6 | ) |
189 | 6 | .await |
190 | 6 | .map_or_else( |
191 | 0 | |e| Some((Err(e), None)), |
192 | 6 | |response| Some((Ok(response), Some(state))), |
193 | | ) |
194 | 14 | }, |
195 | | ); |
196 | | |
197 | 2 | Ok(Response::new(Box::pin(response_stream))) |
198 | 2 | } |
199 | | } |
200 | | |
201 | | type PublishBuildToolEventStreamStream = Pin< |
202 | | Box<dyn Stream<Item = Result<PublishBuildToolEventStreamResponse, Status>> + Send + 'static>, |
203 | | >; |
204 | | |
205 | | #[tonic::async_trait] |
206 | | impl PublishBuildEvent for BepServer { |
207 | | type PublishBuildToolEventStreamStream = PublishBuildToolEventStreamStream; |
208 | | |
209 | | #[instrument( |
210 | | err, |
211 | | ret(level = Level::INFO), |
212 | | level = Level::ERROR, |
213 | | skip_all, |
214 | | fields(request = ?grpc_request.get_ref()) |
215 | | )] |
216 | | async fn publish_lifecycle_event( |
217 | | &self, |
218 | | grpc_request: Request<PublishLifecycleEventRequest>, |
219 | 2 | ) -> Result<Response<()>, Status> { |
220 | 1 | self.inner_publish_lifecycle_event(grpc_request.into_inner(), get_identity()) |
221 | 1 | .await |
222 | 1 | .map_err(Error::into) |
223 | 2 | } |
224 | | |
225 | | #[instrument( |
226 | | err, |
227 | | level = Level::ERROR, |
228 | | skip_all, |
229 | | fields(request = ?grpc_request.get_ref()) |
230 | | )] |
231 | | async fn publish_build_tool_event_stream( |
232 | | &self, |
233 | | grpc_request: Request<Streaming<PublishBuildToolEventStreamRequest>>, |
234 | 4 | ) -> Result<Response<Self::PublishBuildToolEventStreamStream>, Status> { |
235 | 2 | self.inner_publish_build_tool_event_stream(grpc_request.into_inner(), get_identity()) |
236 | 2 | .await |
237 | 2 | .map_err(Error::into) |
238 | 4 | } |
239 | | } |