Coverage Report

Created: 2025-04-19 16:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/bep_server.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::pin::Pin;
17
18
use bytes::BytesMut;
19
use futures::Stream;
20
use futures::stream::unfold;
21
use nativelink_error::{Error, ResultExt};
22
use nativelink_proto::com::github::trace_machina::nativelink::events::{BepEvent, bep_event};
23
use nativelink_proto::google::devtools::build::v1::publish_build_event_server::{
24
    PublishBuildEvent, PublishBuildEventServer,
25
};
26
use nativelink_proto::google::devtools::build::v1::{
27
    PublishBuildToolEventStreamRequest, PublishBuildToolEventStreamResponse,
28
    PublishLifecycleEventRequest,
29
};
30
use nativelink_store::store_manager::StoreManager;
31
use nativelink_util::origin_context::{ActiveOriginContext, ORIGIN_IDENTITY};
32
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike};
33
use prost::Message;
34
use tonic::{Request, Response, Result, Status, Streaming};
35
use tracing::{Level, instrument};
36
37
/// Current version of the BEP event. This might be used in the future if
38
/// there is a breaking change in the BEP event format.
39
const BEP_EVENT_VERSION: u32 = 0;
40
41
#[allow(clippy::result_large_err, reason = "TODO Fix this. Breaks on nightly")]
42
2
fn get_identity() -> Result<Option<String>, Status> {
43
2
    ActiveOriginContext::get()
44
2
        .map_or(Ok(None), |ctx| ctx.get_value(&ORIGIN_IDENTITY))
45
2
        .err_tip(|| 
"In BepServer"0
)
46
2
        .map_or_else(|e| 
Err(e.into())0
, |v| Ok(v.map(|v|
v.as_ref().clone()0
)))
47
2
}
48
49
#[derive(Debug)]
50
pub struct BepServer {
51
    store: Store,
52
}
53
54
impl BepServer {
55
2
    pub fn new(
56
2
        config: &nativelink_config::cas_server::BepConfig,
57
2
        store_manager: &StoreManager,
58
2
    ) -> Result<Self, Error> {
59
2
        let store = store_manager
60
2
            .get_store(&config.store)
61
2
            .err_tip(|| 
format!("Expected store {} to exist in store manager", &config.store)0
)
?0
;
62
63
2
        Ok(Self { store })
64
2
    }
65
66
0
    pub fn into_service(self) -> PublishBuildEventServer<BepServer> {
67
0
        PublishBuildEventServer::new(self)
68
0
    }
69
70
1
    async fn inner_publish_lifecycle_event(
71
1
        &self,
72
1
        request: PublishLifecycleEventRequest,
73
1
        identity: Option<String>,
74
1
    ) -> Result<Response<()>, Error> {
75
1
        let build_event = request
76
1
            .build_event
77
1
            .as_ref()
78
1
            .err_tip(|| 
"Expected build_event to be set"0
)
?0
;
79
1
        let stream_id = build_event
80
1
            .stream_id
81
1
            .as_ref()
82
1
            .err_tip(|| 
"Expected stream_id to be set"0
)
?0
;
83
84
1
        let sequence_number = build_event.sequence_number;
85
1
86
1
        let store_key = StoreKey::Str(Cow::Owned(format!(
87
1
            "BepEvent:le:{}:{}:{}",
88
1
            &stream_id.build_id, &stream_id.invocation_id, sequence_number,
89
1
        )));
90
1
91
1
        let bep_event = BepEvent {
92
1
            version: BEP_EVENT_VERSION,
93
1
            identity: identity.unwrap_or_default(),
94
1
            event: Some(bep_event::Event::LifecycleEvent(request)),
95
1
        };
96
1
        let mut buf = BytesMut::new();
97
1
        bep_event
98
1
            .encode(&mut buf)
99
1
            .err_tip(|| 
"Could not encode PublishLifecycleEventRequest proto"0
)
?0
;
100
101
1
        self.store
102
1
            .update_oneshot(store_key, buf.freeze())
103
1
            .await
104
1
            .err_tip(|| 
"Failed to store PublishLifecycleEventRequest"0
)
?0
;
105
106
1
        Ok(Response::new(()))
107
1
    }
108
109
1
    async fn inner_publish_build_tool_event_stream(
110
1
        &self,
111
1
        stream: Streaming<PublishBuildToolEventStreamRequest>,
112
1
        identity: Option<String>,
113
1
    ) -> Result<Response<PublishBuildToolEventStreamStream>, Error> {
114
5
        async fn process_request(
115
5
            store: Pin<&dyn StoreDriver>,
116
5
            request: PublishBuildToolEventStreamRequest,
117
5
            identity: String,
118
5
        ) -> Result<PublishBuildToolEventStreamResponse, Status> {
119
5
            let ordered_build_event = request
120
5
                .ordered_build_event
121
5
                .as_ref()
122
5
                .err_tip(|| 
"Expected ordered_build_event to be set"0
)
?0
;
123
5
            let stream_id = ordered_build_event
124
5
                .stream_id
125
5
                .as_ref()
126
5
                .err_tip(|| 
"Expected stream_id to be set"0
)
?0
127
5
                .clone();
128
5
129
5
            let sequence_number = ordered_build_event.sequence_number;
130
5
131
5
            let bep_event = BepEvent {
132
5
                version: BEP_EVENT_VERSION,
133
5
                identity,
134
5
                event: Some(bep_event::Event::BuildToolEvent(request)),
135
5
            };
136
5
            let mut buf = BytesMut::new();
137
5
138
5
            bep_event
139
5
                .encode(&mut buf)
140
5
                .err_tip(|| 
"Could not encode PublishBuildToolEventStreamRequest proto"0
)
?0
;
141
142
5
            store
143
5
                .update_oneshot(
144
5
                    StoreKey::Str(Cow::Owned(format!(
145
5
                        "BepEvent:be:{}:{}:{}",
146
5
                        &stream_id.build_id, &stream_id.invocation_id, sequence_number,
147
5
                    ))),
148
5
                    buf.freeze(),
149
5
                )
150
5
                .await
151
5
                .err_tip(|| 
"Failed to store PublishBuildToolEventStreamRequest"0
)
?0
;
152
153
5
            Ok(PublishBuildToolEventStreamResponse {
154
5
                stream_id: Some(stream_id.clone()),
155
5
                sequence_number,
156
5
            })
157
5
        }
158
159
        struct State {
160
            store: Store,
161
            stream: Streaming<PublishBuildToolEventStreamRequest>,
162
            identity: String,
163
        }
164
165
1
        let response_stream =
166
1
            unfold(
167
1
                Some(State {
168
1
                    store: self.store.clone(),
169
1
                    stream,
170
1
                    identity: identity.unwrap_or_default(),
171
1
                }),
172
5
                move |maybe_state| async move {
173
5
                    let mut state = maybe_state
?0
;
174
5
                    let request =
175
5
                        match state.stream.message().await.err_tip(
176
0
                            || "While receiving message in publish_build_tool_event_stream",
177
                        ) {
178
5
                            Ok(Some(request)) => request,
179
0
                            Ok(None) => return None,
180
0
                            Err(e) => return Some((Err(e.into()), None)),
181
                        };
182
5
                    process_request(
183
5
                        state.store.as_store_driver_pin(),
184
5
                        request,
185
5
                        state.identity.clone(),
186
5
                    )
187
5
                    .await
188
5
                    .map_or_else(
189
0
                        |e| Some((Err(e), None)),
190
5
                        |response| Some((Ok(response), Some(state))),
191
                    )
192
10
                },
193
            );
194
195
1
        Ok(Response::new(Box::pin(response_stream)))
196
1
    }
197
}
198
199
type PublishBuildToolEventStreamStream = Pin<
200
    Box<dyn Stream<Item = Result<PublishBuildToolEventStreamResponse, Status>> + Send + 'static>,
201
>;
202
203
#[tonic::async_trait]
204
impl PublishBuildEvent for BepServer {
205
    type PublishBuildToolEventStreamStream = PublishBuildToolEventStreamStream;
206
207
    #[instrument(
208
        err,
209
        ret(level = Level::INFO),
210
        level = Level::ERROR,
211
        skip_all,
212
        fields(request = ?grpc_request.get_ref())
213
    )]
214
    async fn publish_lifecycle_event(
215
        &self,
216
        grpc_request: Request<PublishLifecycleEventRequest>,
217
2
    ) -> Result<Response<()>, Status> {
218
1
        self.inner_publish_lifecycle_event(grpc_request.into_inner(), get_identity()
?0
)
219
1
            .await
220
1
            .map_err(Error::into)
221
2
    }
222
223
    #[instrument(
224
      err,
225
      level = Level::ERROR,
226
      skip_all,
227
      fields(request = ?grpc_request.get_ref())
228
    )]
229
    async fn publish_build_tool_event_stream(
230
        &self,
231
        grpc_request: Request<Streaming<PublishBuildToolEventStreamRequest>>,
232
2
    ) -> Result<Response<Self::PublishBuildToolEventStreamStream>, Status> {
233
1
        self.inner_publish_build_tool_event_stream(grpc_request.into_inner(), get_identity()
?0
)
234
1
            .await
235
1
            .map_err(Error::into)
236
2
    }
237
}