Coverage Report

Created: 2024-11-22 20:17

/build/source/nativelink-service/src/bep_server.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::pin::Pin;
17
18
use bytes::BytesMut;
19
use futures::stream::unfold;
20
use futures::Stream;
21
use nativelink_error::{Error, ResultExt};
22
use nativelink_proto::google::devtools::build::v1::publish_build_event_server::{
23
    PublishBuildEvent, PublishBuildEventServer,
24
};
25
use nativelink_proto::google::devtools::build::v1::{
26
    PublishBuildToolEventStreamRequest, PublishBuildToolEventStreamResponse,
27
    PublishLifecycleEventRequest,
28
};
29
use nativelink_store::store_manager::StoreManager;
30
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike};
31
use prost::Message;
32
use tonic::{Request, Response, Result, Status, Streaming};
33
use tracing::{instrument, Level};
34
35
pub struct BepServer {
36
    store: Store,
37
}
38
39
impl BepServer {
40
2
    pub fn new(
41
2
        config: &nativelink_config::cas_server::BepConfig,
42
2
        store_manager: &StoreManager,
43
2
    ) -> Result<Self, Error> {
44
2
        let store = store_manager
45
2
            .get_store(&config.store)
46
2
            .err_tip(|| 
format!("Expected store {} to exist in store manager", &config.store)0
)
?0
;
47
48
2
        Ok(Self { store })
49
2
    }
50
51
0
    pub fn into_service(self) -> PublishBuildEventServer<BepServer> {
52
0
        PublishBuildEventServer::new(self)
53
0
    }
54
55
1
    async fn inner_publish_lifecycle_event(
56
1
        &self,
57
1
        request: PublishLifecycleEventRequest,
58
1
    ) -> Result<Response<()>, Error> {
59
1
        let build_event = request
60
1
            .build_event
61
1
            .as_ref()
62
1
            .err_tip(|| 
"Expected build_event to be set"0
)
?0
;
63
1
        let stream_id = build_event
64
1
            .stream_id
65
1
            .as_ref()
66
1
            .err_tip(|| 
"Expected stream_id to be set"0
)
?0
;
67
68
1
        let sequence_number = build_event.sequence_number;
69
1
70
1
        let mut buf = BytesMut::new();
71
1
        request
72
1
            .encode(&mut buf)
73
1
            .err_tip(|| 
"Could not encode PublishLifecycleEventRequest proto"0
)
?0
;
74
75
1
        self.store
76
1
            .update_oneshot(
77
1
                StoreKey::Str(Cow::Owned(format!(
78
1
                    "LifecycleEvent:{}:{}:{}",
79
1
                    &stream_id.build_id, &stream_id.invocation_id, sequence_number,
80
1
                ))),
81
1
                buf.freeze(),
82
1
            )
83
0
            .await
84
1
            .err_tip(|| 
"Failed to store PublishLifecycleEventRequest"0
)
?0
;
85
86
1
        Ok(Response::new(()))
87
1
    }
88
89
1
    async fn inner_publish_build_tool_event_stream(
90
1
        &self,
91
1
        stream: Streaming<PublishBuildToolEventStreamRequest>,
92
1
    ) -> Result<Response<PublishBuildToolEventStreamStream>, Error> {
93
5
        async fn process_request(
94
5
            store: Pin<&dyn StoreDriver>,
95
5
            request: PublishBuildToolEventStreamRequest,
96
5
        ) -> Result<PublishBuildToolEventStreamResponse, Status> {
97
5
            let ordered_build_event = request
98
5
                .ordered_build_event
99
5
                .as_ref()
100
5
                .err_tip(|| 
"Expected ordered_build_event to be set"0
)
?0
;
101
5
            let stream_id = ordered_build_event
102
5
                .stream_id
103
5
                .as_ref()
104
5
                .err_tip(|| 
"Expected stream_id to be set"0
)
?0
;
105
106
5
            let sequence_number = ordered_build_event.sequence_number;
107
5
108
5
            let mut buf = BytesMut::new();
109
5
110
5
            request
111
5
                .encode(&mut buf)
112
5
                .err_tip(|| 
"Could not encode PublishBuildToolEventStreamRequest proto"0
)
?0
;
113
114
5
            store
115
5
                .update_oneshot(
116
5
                    StoreKey::Str(Cow::Owned(format!(
117
5
                        "BuildToolEventStream:{}:{}:{}",
118
5
                        &stream_id.build_id, &stream_id.invocation_id, sequence_number,
119
5
                    ))),
120
5
                    buf.freeze(),
121
5
                )
122
0
                .await
123
5
                .err_tip(|| 
"Failed to store PublishBuildToolEventStreamRequest"0
)
?0
;
124
125
5
            Ok(PublishBuildToolEventStreamResponse {
126
5
                stream_id: Some(stream_id.clone()),
127
5
                sequence_number,
128
5
            })
129
5
        }
130
131
        struct State {
132
            store: Store,
133
            stream: Streaming<PublishBuildToolEventStreamRequest>,
134
        }
135
136
1
        let response_stream =
137
1
            unfold(
138
1
                Some(State {
139
1
                    store: self.store.clone(),
140
1
                    stream,
141
1
                }),
142
5
                move |maybe_state| async move {
143
5
                    let mut state = maybe_state
?0
;
144
5
                    let request =
145
5
                        match state.stream.message().
await0
.err_tip(|| {
146
0
                            "While receiving message in publish_build_tool_event_stream"
147
5
                        }) {
148
5
                            Ok(Some(request)) => request,
149
0
                            Ok(None) => return None,
150
0
                            Err(e) => return Some((Err(e.into()), None)),
151
                        };
152
5
                    process_request(state.store.as_store_driver_pin(), request)
153
0
                        .await
154
5
                        .map_or_else(
155
5
                            |e| 
Some((Err(e), None))0
,
156
5
                            |response| Some((Ok(response), Some(state))),
157
5
                        )
158
10
                },
159
1
            );
160
1
161
1
        Ok(Response::new(Box::pin(response_stream)))
162
1
    }
163
}
164
165
type PublishBuildToolEventStreamStream = Pin<
166
    Box<dyn Stream<Item = Result<PublishBuildToolEventStreamResponse, Status>> + Send + 'static>,
167
>;
168
169
#[tonic::async_trait]
170
impl PublishBuildEvent for BepServer {
171
    type PublishBuildToolEventStreamStream = PublishBuildToolEventStreamStream;
172
173
    #[allow(clippy::blocks_in_conditions)]
174
1
    #[instrument(
175
        err,
176
        ret(level = Level::INFO),
177
        level = Level::ERROR,
178
        skip_all,
179
        fields(request = ?grpc_request.get_ref())
180
2
    )]
181
    async fn publish_lifecycle_event(
182
        &self,
183
        grpc_request: Request<PublishLifecycleEventRequest>,
184
1
    ) -> Result<Response<()>, Status> {
185
1
        self.inner_publish_lifecycle_event(grpc_request.into_inner())
186
0
            .await
187
1
            .map_err(Error::into)
188
2
    }
189
190
    #[allow(clippy::blocks_in_conditions)]
191
1
    #[instrument(
192
      err,
193
      level = Level::ERROR,
194
      skip_all,
195
      fields(request = ?grpc_request.get_ref())
196
2
    )]
197
    async fn publish_build_tool_event_stream(
198
        &self,
199
        grpc_request: Request<Streaming<PublishBuildToolEventStreamRequest>>,
200
1
    ) -> Result<Response<Self::PublishBuildToolEventStreamStream>, Status> {
201
1
        self.inner_publish_build_tool_event_stream(grpc_request.into_inner())
202
0
            .await
203
1
            .map_err(Error::into)
204
2
    }
205
}