Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-service/src/ac_server.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
use std::convert::Into;
17
use std::fmt::Debug;
18
19
use bytes::BytesMut;
20
use nativelink_config::cas_server::{AcStoreConfig, InstanceName};
21
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
22
use nativelink_proto::build::bazel::remote::execution::v2::action_cache_server::{
23
    ActionCache, ActionCacheServer as Server,
24
};
25
use nativelink_proto::build::bazel::remote::execution::v2::{
26
    ActionResult, GetActionResultRequest, UpdateActionResultRequest,
27
};
28
use nativelink_store::ac_utils::{get_and_decode_digest, ESTIMATED_DIGEST_SIZE};
29
use nativelink_store::grpc_store::GrpcStore;
30
use nativelink_store::store_manager::StoreManager;
31
use nativelink_util::common::DigestInfo;
32
use nativelink_util::digest_hasher::make_ctx_for_hash_func;
33
use nativelink_util::store_trait::{Store, StoreLike};
34
use prost::Message;
35
use tonic::{Request, Response, Status};
36
use tracing::{error_span, event, instrument, Level};
37
38
#[derive(Clone)]
39
pub struct AcStoreInfo {
40
    store: Store,
41
    read_only: bool,
42
}
43
44
pub struct AcServer {
45
    stores: HashMap<String, AcStoreInfo>,
46
}
47
48
impl Debug for AcServer {
49
0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50
0
        f.debug_struct("AcServer").finish()
51
0
    }
52
}
53
54
impl AcServer {
55
4
    pub fn new(
56
4
        config: &HashMap<InstanceName, AcStoreConfig>,
57
4
        store_manager: &StoreManager,
58
4
    ) -> Result<Self, Error> {
59
4
        let mut stores = HashMap::with_capacity(config.len());
60
8
        for (
instance_name, ac_cfg4
) in config {
61
4
            let store = store_manager.get_store(&ac_cfg.ac_store).ok_or_else(|| {
62
0
                make_input_err!("'ac_store': '{}' does not exist", ac_cfg.ac_store)
63
4
            })
?0
;
64
4
            stores.insert(
65
4
                instance_name.to_string(),
66
4
                AcStoreInfo {
67
4
                    store,
68
4
                    read_only: ac_cfg.read_only,
69
4
                },
70
4
            );
71
        }
72
4
        Ok(AcServer {
73
4
            stores: stores.clone(),
74
4
        })
75
4
    }
76
77
0
    pub fn into_service(self) -> Server<AcServer> {
78
0
        Server::new(self)
79
0
    }
80
81
3
    async fn inner_get_action_result(
82
3
        &self,
83
3
        request: GetActionResultRequest,
84
3
    ) -> Result<Response<ActionResult>, Error> {
85
3
        let instance_name = &request.instance_name;
86
3
        let store_info = self
87
3
            .stores
88
3
            .get(instance_name)
89
3
            .err_tip(|| 
format!("'instance_name' not configured for '{instance_name}'")0
)
?0
;
90
91
        // TODO(blaise.bruer) We should write a test for these errors.
92
3
        let digest: DigestInfo = request
93
3
            .action_digest
94
3
            .clone()
95
3
            .err_tip(|| 
"Action digest was not set in message"0
)
?0
96
3
            .try_into()
?0
;
97
98
        // If we are a GrpcStore we shortcut here, as this is a special store.
99
3
        if let Some(
grpc_store0
) = store_info
  Branch (99:16): [True: 0, False: 3]
  Branch (99:16): [Folded - Ignored]
100
3
            .store
101
3
            .downcast_ref::<GrpcStore>(Some(digest.into()))
102
        {
103
0
            return grpc_store.get_action_result(Request::new(request)).await;
104
3
        }
105
3
106
3
        Ok(Response::new(
107
3
            get_and_decode_digest::<ActionResult>(&store_info.store, digest.into()).await
?2
,
108
        ))
109
3
    }
110
111
1
    async fn inner_update_action_result(
112
1
        &self,
113
1
        request: UpdateActionResultRequest,
114
1
    ) -> Result<Response<ActionResult>, Error> {
115
1
        let instance_name = &request.instance_name;
116
1
        let store_info = self
117
1
            .stores
118
1
            .get(instance_name)
119
1
            .err_tip(|| 
format!("'instance_name' not configured for '{instance_name}'")0
)
?0
;
120
121
1
        if store_info.read_only {
  Branch (121:12): [True: 0, False: 1]
  Branch (121:12): [Folded - Ignored]
122
0
            return Err(make_err!(
123
0
                Code::PermissionDenied,
124
0
                "The store '{instance_name}' is read only on this endpoint",
125
0
            ));
126
1
        }
127
128
1
        let digest: DigestInfo = request
129
1
            .action_digest
130
1
            .clone()
131
1
            .err_tip(|| 
"Action digest was not set in message"0
)
?0
132
1
            .try_into()
?0
;
133
134
        // If we are a GrpcStore we shortcut here, as this is a special store.
135
1
        if let Some(
grpc_store0
) = store_info
  Branch (135:16): [True: 0, False: 1]
  Branch (135:16): [Folded - Ignored]
136
1
            .store
137
1
            .downcast_ref::<GrpcStore>(Some(digest.into()))
138
        {
139
0
            return grpc_store.update_action_result(Request::new(request)).await;
140
1
        }
141
142
1
        let action_result = request
143
1
            .action_result
144
1
            .err_tip(|| 
"Action result was not set in message"0
)
?0
;
145
146
1
        let mut store_data = BytesMut::with_capacity(ESTIMATED_DIGEST_SIZE);
147
1
        action_result
148
1
            .encode(&mut store_data)
149
1
            .err_tip(|| 
"Provided ActionResult could not be serialized"0
)
?0
;
150
151
1
        store_info
152
1
            .store
153
1
            .update_oneshot(digest, store_data.freeze())
154
0
            .await
155
1
            .err_tip(|| 
"Failed to update in action cache"0
)
?0
;
156
1
        Ok(Response::new(action_result))
157
1
    }
158
}
159
160
#[tonic::async_trait]
161
impl ActionCache for AcServer {
162
    #[allow(clippy::blocks_in_conditions)]
163
3
    #[instrument(
164
        ret(level = Level::INFO),
165
        level = Level::ERROR,
166
        skip_all,
167
        fields(request = ?grpc_request.get_ref())
168
6
    )]
169
    async fn get_action_result(
170
        &self,
171
        grpc_request: Request<GetActionResultRequest>,
172
3
    ) -> Result<Response<ActionResult>, Status> {
173
3
        let request = grpc_request.into_inner();
174
3
        let resp = make_ctx_for_hash_func(request.digest_function)
175
3
            .err_tip(|| 
"In AcServer::get_action_result"0
)
?0
176
            .wrap_async(
177
3
                error_span!("ac_server_get_action_result"),
178
3
                self.inner_get_action_result(request),
179
            )
180
3
            .await;
181
182
        // let resp = self.inner_get_action_result(grpc_request).await;
183
3
        if resp.is_err() && 
resp.as_ref().err().unwrap().code != Code::NotFound2
{
  Branch (183:12): [True: 2, False: 1]
  Branch (183:29): [True: 0, False: 2]
  Branch (183:12): [Folded - Ignored]
  Branch (183:29): [Folded - Ignored]
184
0
            event!(Level::ERROR, return = ?resp);
185
3
        }
186
3
        return resp.map_err(Into::into);
187
6
    }
188
189
    #[allow(clippy::blocks_in_conditions)]
190
1
    #[instrument(
191
        err,
192
        ret(level = Level::INFO),
193
        level = Level::ERROR,
194
        skip_all,
195
        fields(request = ?grpc_request.get_ref())
196
2
    )]
197
    async fn update_action_result(
198
        &self,
199
        grpc_request: Request<UpdateActionResultRequest>,
200
1
    ) -> Result<Response<ActionResult>, Status> {
201
1
        let request = grpc_request.into_inner();
202
1
        make_ctx_for_hash_func(request.digest_function)
203
1
            .err_tip(|| 
"In AcServer::update_action_result"0
)
?0
204
            .wrap_async(
205
1
                error_span!("ac_server_update_action_result"),
206
1
                self.inner_update_action_result(request),
207
            )
208
0
            .await
209
1
            .map_err(Into::into)
210
2
    }
211
}