Coverage Report

Created: 2025-06-24 08:57

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/bep_server.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use std::borrow::Cow;
17
18
use bytes::BytesMut;
19
use futures::Stream;
20
use futures::stream::unfold;
21
use nativelink_error::{Error, ResultExt};
22
use nativelink_proto::com::github::trace_machina::nativelink::events::{BepEvent, bep_event};
23
use nativelink_proto::google::devtools::build::v1::publish_build_event_server::{
24
    PublishBuildEvent, PublishBuildEventServer,
25
};
26
use nativelink_proto::google::devtools::build::v1::{
27
    PublishBuildToolEventStreamRequest, PublishBuildToolEventStreamResponse,
28
    PublishLifecycleEventRequest,
29
};
30
use nativelink_store::store_manager::StoreManager;
31
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike};
32
use opentelemetry::baggage::BaggageExt;
33
use opentelemetry::context::Context;
34
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
35
use prost::Message;
36
use tonic::{Request, Response, Result, Status, Streaming};
37
use tracing::{Level, instrument};
38
39
/// Current version of the BEP event. This might be used in the future if
40
/// there is a breaking change in the BEP event format.
41
const BEP_EVENT_VERSION: u32 = 0;
42
43
#[allow(clippy::result_large_err, reason = "TODO Fix this. Breaks on nightly")]
44
3
fn get_identity() -> Option<String> {
45
3
    Context::current()
46
3
        .baggage()
47
3
        .get(ENDUSER_ID)
48
3
        .map(|value| 
value.as_str()0
.
to_string0
())
49
3
}
50
51
#[derive(Debug)]
52
pub struct BepServer {
53
    store: Store,
54
}
55
56
impl BepServer {
57
3
    pub fn new(
58
3
        config: &nativelink_config::cas_server::BepConfig,
59
3
        store_manager: &StoreManager,
60
3
    ) -> Result<Self, Error> {
61
3
        let store = store_manager
62
3
            .get_store(&config.store)
63
3
            .err_tip(|| format!(
"Expected store {} to exist in store manager"0
,
&config.store0
))
?0
;
64
65
3
        Ok(Self { store })
66
3
    }
67
68
0
    pub fn into_service(self) -> PublishBuildEventServer<Self> {
69
0
        PublishBuildEventServer::new(self)
70
0
    }
71
72
1
    async fn inner_publish_lifecycle_event(
73
1
        &self,
74
1
        request: PublishLifecycleEventRequest,
75
1
        identity: Option<String>,
76
1
    ) -> Result<Response<()>, Error> {
77
1
        let build_event = request
78
1
            .build_event
79
1
            .as_ref()
80
1
            .err_tip(|| "Expected build_event to be set")
?0
;
81
1
        let stream_id = build_event
82
1
            .stream_id
83
1
            .as_ref()
84
1
            .err_tip(|| "Expected stream_id to be set")
?0
;
85
86
1
        let sequence_number = build_event.sequence_number;
87
88
1
        let store_key = StoreKey::Str(Cow::Owned(format!(
89
1
            "BepEvent:le:{}:{}:{}",
90
1
            &stream_id.build_id, &stream_id.invocation_id, sequence_number,
91
1
        )));
92
93
1
        let bep_event = BepEvent {
94
1
            version: BEP_EVENT_VERSION,
95
1
            identity: identity.unwrap_or_default(),
96
1
            event: Some(bep_event::Event::LifecycleEvent(request)),
97
1
        };
98
1
        let mut buf = BytesMut::new();
99
1
        bep_event
100
1
            .encode(&mut buf)
101
1
            .err_tip(|| "Could not encode PublishLifecycleEventRequest proto")
?0
;
102
103
1
        self.store
104
1
            .update_oneshot(store_key, buf.freeze())
105
1
            .await
106
1
            .err_tip(|| "Failed to store PublishLifecycleEventRequest")
?0
;
107
108
1
        Ok(Response::new(()))
109
1
    }
110
111
2
    async fn inner_publish_build_tool_event_stream(
112
2
        &self,
113
2
        stream: Streaming<PublishBuildToolEventStreamRequest>,
114
2
        identity: Option<String>,
115
2
    ) -> Result<Response<PublishBuildToolEventStreamStream>, Error> {
116
6
        async fn process_request(
117
6
            store: Pin<&dyn StoreDriver>,
118
6
            request: PublishBuildToolEventStreamRequest,
119
6
            identity: String,
120
6
        ) -> Result<PublishBuildToolEventStreamResponse, Status> {
121
6
            let ordered_build_event = request
122
6
                .ordered_build_event
123
6
                .as_ref()
124
6
                .err_tip(|| "Expected ordered_build_event to be set")
?0
;
125
6
            let stream_id = ordered_build_event
126
6
                .stream_id
127
6
                .as_ref()
128
6
                .err_tip(|| "Expected stream_id to be set")
?0
129
6
                .clone();
130
131
6
            let sequence_number = ordered_build_event.sequence_number;
132
133
6
            let bep_event = BepEvent {
134
6
                version: BEP_EVENT_VERSION,
135
6
                identity,
136
6
                event: Some(bep_event::Event::BuildToolEvent(request)),
137
6
            };
138
6
            let mut buf = BytesMut::new();
139
140
6
            bep_event
141
6
                .encode(&mut buf)
142
6
                .err_tip(|| "Could not encode PublishBuildToolEventStreamRequest proto")
?0
;
143
144
6
            store
145
6
                .update_oneshot(
146
6
                    StoreKey::Str(Cow::Owned(format!(
147
6
                        "BepEvent:be:{}:{}:{}",
148
6
                        &stream_id.build_id, &stream_id.invocation_id, sequence_number,
149
6
                    ))),
150
6
                    buf.freeze(),
151
6
                )
152
6
                .await
153
6
                .err_tip(|| "Failed to store PublishBuildToolEventStreamRequest")
?0
;
154
155
6
            Ok(PublishBuildToolEventStreamResponse {
156
6
                stream_id: Some(stream_id.clone()),
157
6
                sequence_number,
158
6
            })
159
6
        }
160
161
        struct State {
162
            store: Store,
163
            stream: Streaming<PublishBuildToolEventStreamRequest>,
164
            identity: String,
165
        }
166
167
2
        let response_stream =
168
2
            unfold(
169
2
                Some(State {
170
2
                    store: self.store.clone(),
171
2
                    stream,
172
2
                    identity: identity.unwrap_or_default(),
173
2
                }),
174
7
                move |maybe_state| async move {
175
7
                    let mut state = maybe_state
?0
;
176
6
                    let request =
177
7
                        match state.stream.message().await.err_tip(
178
                            || "While receiving message in publish_build_tool_event_stream",
179
                        ) {
180
6
                            Ok(Some(request)) => request,
181
1
                            Ok(None) => return None,
182
0
                            Err(e) => return Some((Err(e.into()), None)),
183
                        };
184
6
                    process_request(
185
6
                        state.store.as_store_driver_pin(),
186
6
                        request,
187
6
                        state.identity.clone(),
188
6
                    )
189
6
                    .await
190
6
                    .map_or_else(
191
0
                        |e| Some((Err(e), None)),
192
6
                        |response| Some((Ok(response), Some(state))),
193
                    )
194
14
                },
195
            );
196
197
2
        Ok(Response::new(Box::pin(response_stream)))
198
2
    }
199
}
200
201
type PublishBuildToolEventStreamStream = Pin<
202
    Box<dyn Stream<Item = Result<PublishBuildToolEventStreamResponse, Status>> + Send + 'static>,
203
>;
204
205
#[tonic::async_trait]
206
impl PublishBuildEvent for BepServer {
207
    type PublishBuildToolEventStreamStream = PublishBuildToolEventStreamStream;
208
209
    #[instrument(
210
        err,
211
        ret(level = Level::INFO),
212
        level = Level::ERROR,
213
        skip_all,
214
        fields(request = ?grpc_request.get_ref())
215
    )]
216
    async fn publish_lifecycle_event(
217
        &self,
218
        grpc_request: Request<PublishLifecycleEventRequest>,
219
2
    ) -> Result<Response<()>, Status> {
220
1
        self.inner_publish_lifecycle_event(grpc_request.into_inner(), get_identity())
221
1
            .await
222
1
            .map_err(Error::into)
223
2
    }
224
225
    #[instrument(
226
      err,
227
      level = Level::ERROR,
228
      skip_all,
229
      fields(request = ?grpc_request.get_ref())
230
    )]
231
    async fn publish_build_tool_event_stream(
232
        &self,
233
        grpc_request: Request<Streaming<PublishBuildToolEventStreamRequest>>,
234
4
    ) -> Result<Response<Self::PublishBuildToolEventStreamStream>, Status> {
235
2
        self.inner_publish_build_tool_event_stream(grpc_request.into_inner(), get_identity())
236
2
            .await
237
2
            .map_err(Error::into)
238
4
    }
239
}