/build/source/nativelink-service/src/ac_server.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::collections::HashMap; |
16 | | use std::convert::Into; |
17 | | use std::fmt::Debug; |
18 | | |
19 | | use bytes::BytesMut; |
20 | | use nativelink_config::cas_server::{AcStoreConfig, InstanceName}; |
21 | | use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; |
22 | | use nativelink_proto::build::bazel::remote::execution::v2::action_cache_server::{ |
23 | | ActionCache, ActionCacheServer as Server, |
24 | | }; |
25 | | use nativelink_proto::build::bazel::remote::execution::v2::{ |
26 | | ActionResult, GetActionResultRequest, UpdateActionResultRequest, |
27 | | }; |
28 | | use nativelink_store::ac_utils::{get_and_decode_digest, ESTIMATED_DIGEST_SIZE}; |
29 | | use nativelink_store::grpc_store::GrpcStore; |
30 | | use nativelink_store::store_manager::StoreManager; |
31 | | use nativelink_util::common::DigestInfo; |
32 | | use nativelink_util::digest_hasher::make_ctx_for_hash_func; |
33 | | use nativelink_util::store_trait::{Store, StoreLike}; |
34 | | use prost::Message; |
35 | | use tonic::{Request, Response, Status}; |
36 | | use tracing::{error_span, event, instrument, Level}; |
37 | | |
38 | | #[derive(Clone)] |
39 | | pub struct AcStoreInfo { |
40 | | store: Store, |
41 | | read_only: bool, |
42 | | } |
43 | | |
44 | | pub struct AcServer { |
45 | | stores: HashMap<String, AcStoreInfo>, |
46 | | } |
47 | | |
48 | | impl Debug for AcServer { |
49 | 0 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
50 | 0 | f.debug_struct("AcServer").finish() |
51 | 0 | } |
52 | | } |
53 | | |
54 | | impl AcServer { |
55 | 4 | pub fn new( |
56 | 4 | config: &HashMap<InstanceName, AcStoreConfig>, |
57 | 4 | store_manager: &StoreManager, |
58 | 4 | ) -> Result<Self, Error> { |
59 | 4 | let mut stores = HashMap::with_capacity(config.len()); |
60 | 8 | for (instance_name, ac_cfg4 ) in config { |
61 | 4 | let store = store_manager.get_store(&ac_cfg.ac_store).ok_or_else(|| { |
62 | 0 | make_input_err!("'ac_store': '{}' does not exist", ac_cfg.ac_store) |
63 | 4 | })?0 ; |
64 | 4 | stores.insert( |
65 | 4 | instance_name.to_string(), |
66 | 4 | AcStoreInfo { |
67 | 4 | store, |
68 | 4 | read_only: ac_cfg.read_only, |
69 | 4 | }, |
70 | 4 | ); |
71 | | } |
72 | 4 | Ok(AcServer { |
73 | 4 | stores: stores.clone(), |
74 | 4 | }) |
75 | 4 | } |
76 | | |
77 | 0 | pub fn into_service(self) -> Server<AcServer> { |
78 | 0 | Server::new(self) |
79 | 0 | } |
80 | | |
81 | 3 | async fn inner_get_action_result( |
82 | 3 | &self, |
83 | 3 | request: GetActionResultRequest, |
84 | 3 | ) -> Result<Response<ActionResult>, Error> { |
85 | 3 | let instance_name = &request.instance_name; |
86 | 3 | let store_info = self |
87 | 3 | .stores |
88 | 3 | .get(instance_name) |
89 | 3 | .err_tip(|| format!("'instance_name' not configured for '{instance_name}'")0 )?0 ; |
90 | | |
91 | | // TODO(blaise.bruer) We should write a test for these errors. |
92 | 3 | let digest: DigestInfo = request |
93 | 3 | .action_digest |
94 | 3 | .clone() |
95 | 3 | .err_tip(|| "Action digest was not set in message"0 )?0 |
96 | 3 | .try_into()?0 ; |
97 | | |
98 | | // If we are a GrpcStore we shortcut here, as this is a special store. |
99 | 3 | if let Some(grpc_store0 ) = store_info Branch (99:16): [True: 0, False: 3]
Branch (99:16): [Folded - Ignored]
|
100 | 3 | .store |
101 | 3 | .downcast_ref::<GrpcStore>(Some(digest.into())) |
102 | | { |
103 | 0 | return grpc_store.get_action_result(Request::new(request)).await; |
104 | 3 | } |
105 | 3 | |
106 | 3 | Ok(Response::new( |
107 | 3 | get_and_decode_digest::<ActionResult>(&store_info.store, digest.into()).await?2 , |
108 | | )) |
109 | 3 | } |
110 | | |
111 | 1 | async fn inner_update_action_result( |
112 | 1 | &self, |
113 | 1 | request: UpdateActionResultRequest, |
114 | 1 | ) -> Result<Response<ActionResult>, Error> { |
115 | 1 | let instance_name = &request.instance_name; |
116 | 1 | let store_info = self |
117 | 1 | .stores |
118 | 1 | .get(instance_name) |
119 | 1 | .err_tip(|| format!("'instance_name' not configured for '{instance_name}'")0 )?0 ; |
120 | | |
121 | 1 | if store_info.read_only { Branch (121:12): [True: 0, False: 1]
Branch (121:12): [Folded - Ignored]
|
122 | 0 | return Err(make_err!( |
123 | 0 | Code::PermissionDenied, |
124 | 0 | "The store '{instance_name}' is read only on this endpoint", |
125 | 0 | )); |
126 | 1 | } |
127 | | |
128 | 1 | let digest: DigestInfo = request |
129 | 1 | .action_digest |
130 | 1 | .clone() |
131 | 1 | .err_tip(|| "Action digest was not set in message"0 )?0 |
132 | 1 | .try_into()?0 ; |
133 | | |
134 | | // If we are a GrpcStore we shortcut here, as this is a special store. |
135 | 1 | if let Some(grpc_store0 ) = store_info Branch (135:16): [True: 0, False: 1]
Branch (135:16): [Folded - Ignored]
|
136 | 1 | .store |
137 | 1 | .downcast_ref::<GrpcStore>(Some(digest.into())) |
138 | | { |
139 | 0 | return grpc_store.update_action_result(Request::new(request)).await; |
140 | 1 | } |
141 | | |
142 | 1 | let action_result = request |
143 | 1 | .action_result |
144 | 1 | .err_tip(|| "Action result was not set in message"0 )?0 ; |
145 | | |
146 | 1 | let mut store_data = BytesMut::with_capacity(ESTIMATED_DIGEST_SIZE); |
147 | 1 | action_result |
148 | 1 | .encode(&mut store_data) |
149 | 1 | .err_tip(|| "Provided ActionResult could not be serialized"0 )?0 ; |
150 | | |
151 | 1 | store_info |
152 | 1 | .store |
153 | 1 | .update_oneshot(digest, store_data.freeze()) |
154 | 0 | .await |
155 | 1 | .err_tip(|| "Failed to update in action cache"0 )?0 ; |
156 | 1 | Ok(Response::new(action_result)) |
157 | 1 | } |
158 | | } |
159 | | |
160 | | #[tonic::async_trait] |
161 | | impl ActionCache for AcServer { |
162 | | #[allow(clippy::blocks_in_conditions)] |
163 | 3 | #[instrument( |
164 | | ret(level = Level::INFO), |
165 | | level = Level::ERROR, |
166 | | skip_all, |
167 | | fields(request = ?grpc_request.get_ref()) |
168 | 6 | )] |
169 | | async fn get_action_result( |
170 | | &self, |
171 | | grpc_request: Request<GetActionResultRequest>, |
172 | 3 | ) -> Result<Response<ActionResult>, Status> { |
173 | 3 | let request = grpc_request.into_inner(); |
174 | 3 | let resp = make_ctx_for_hash_func(request.digest_function) |
175 | 3 | .err_tip(|| "In AcServer::get_action_result"0 )?0 |
176 | | .wrap_async( |
177 | 3 | error_span!("ac_server_get_action_result"), |
178 | 3 | self.inner_get_action_result(request), |
179 | | ) |
180 | 3 | .await; |
181 | | |
182 | | // let resp = self.inner_get_action_result(grpc_request).await; |
183 | 3 | if resp.is_err() && resp.as_ref().err().unwrap().code != Code::NotFound2 { Branch (183:12): [True: 2, False: 1]
Branch (183:29): [True: 0, False: 2]
Branch (183:12): [Folded - Ignored]
Branch (183:29): [Folded - Ignored]
|
184 | 0 | event!(Level::ERROR, return = ?resp); |
185 | 3 | } |
186 | 3 | return resp.map_err(Into::into); |
187 | 6 | } |
188 | | |
189 | | #[allow(clippy::blocks_in_conditions)] |
190 | 1 | #[instrument( |
191 | | err, |
192 | | ret(level = Level::INFO), |
193 | | level = Level::ERROR, |
194 | | skip_all, |
195 | | fields(request = ?grpc_request.get_ref()) |
196 | 2 | )] |
197 | | async fn update_action_result( |
198 | | &self, |
199 | | grpc_request: Request<UpdateActionResultRequest>, |
200 | 1 | ) -> Result<Response<ActionResult>, Status> { |
201 | 1 | let request = grpc_request.into_inner(); |
202 | 1 | make_ctx_for_hash_func(request.digest_function) |
203 | 1 | .err_tip(|| "In AcServer::update_action_result"0 )?0 |
204 | | .wrap_async( |
205 | 1 | error_span!("ac_server_update_action_result"), |
206 | 1 | self.inner_update_action_result(request), |
207 | | ) |
208 | 0 | .await |
209 | 1 | .map_err(Into::into) |
210 | 2 | } |
211 | | } |