/build/source/nativelink-service/src/ac_server.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::collections::HashMap; |
16 | | use std::convert::Into; |
17 | | use std::fmt::Debug; |
18 | | |
19 | | use bytes::BytesMut; |
20 | | use nativelink_config::cas_server::{AcStoreConfig, InstanceName}; |
21 | | use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; |
22 | | use nativelink_proto::build::bazel::remote::execution::v2::action_cache_server::{ |
23 | | ActionCache, ActionCacheServer as Server, |
24 | | }; |
25 | | use nativelink_proto::build::bazel::remote::execution::v2::{ |
26 | | ActionResult, GetActionResultRequest, UpdateActionResultRequest, |
27 | | }; |
28 | | use nativelink_store::ac_utils::{get_and_decode_digest, ESTIMATED_DIGEST_SIZE}; |
29 | | use nativelink_store::grpc_store::GrpcStore; |
30 | | use nativelink_store::store_manager::StoreManager; |
31 | | use nativelink_util::common::DigestInfo; |
32 | | use nativelink_util::digest_hasher::make_ctx_for_hash_func; |
33 | | use nativelink_util::origin_event::OriginEventContext; |
34 | | use nativelink_util::store_trait::{Store, StoreLike}; |
35 | | use prost::Message; |
36 | | use tonic::{Request, Response, Status}; |
37 | | use tracing::{error_span, event, instrument, Level}; |
38 | | |
39 | | #[derive(Clone)] |
40 | | pub struct AcStoreInfo { |
41 | | store: Store, |
42 | | read_only: bool, |
43 | | } |
44 | | |
45 | | pub struct AcServer { |
46 | | stores: HashMap<String, AcStoreInfo>, |
47 | | } |
48 | | |
49 | | impl Debug for AcServer { |
50 | 0 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
51 | 0 | f.debug_struct("AcServer").finish() |
52 | 0 | } |
53 | | } |
54 | | |
55 | | impl AcServer { |
56 | 4 | pub fn new( |
57 | 4 | config: &HashMap<InstanceName, AcStoreConfig>, |
58 | 4 | store_manager: &StoreManager, |
59 | 4 | ) -> Result<Self, Error> { |
60 | 4 | let mut stores = HashMap::with_capacity(config.len()); |
61 | 8 | for (instance_name, ac_cfg4 ) in config { |
62 | 4 | let store = store_manager.get_store(&ac_cfg.ac_store).ok_or_else(|| { |
63 | 0 | make_input_err!("'ac_store': '{}' does not exist", ac_cfg.ac_store) |
64 | 4 | })?0 ; |
65 | 4 | stores.insert( |
66 | 4 | instance_name.to_string(), |
67 | 4 | AcStoreInfo { |
68 | 4 | store, |
69 | 4 | read_only: ac_cfg.read_only, |
70 | 4 | }, |
71 | 4 | ); |
72 | | } |
73 | 4 | Ok(AcServer { |
74 | 4 | stores: stores.clone(), |
75 | 4 | }) |
76 | 4 | } |
77 | | |
78 | 0 | pub fn into_service(self) -> Server<AcServer> { |
79 | 0 | Server::new(self) |
80 | 0 | } |
81 | | |
82 | 3 | async fn inner_get_action_result( |
83 | 3 | &self, |
84 | 3 | request: GetActionResultRequest, |
85 | 3 | ) -> Result<Response<ActionResult>, Error> { |
86 | 3 | let instance_name = &request.instance_name; |
87 | 3 | let store_info = self |
88 | 3 | .stores |
89 | 3 | .get(instance_name) |
90 | 3 | .err_tip(|| format!("'instance_name' not configured for '{instance_name}'")0 )?0 ; |
91 | | |
92 | | // TODO(blaise.bruer) We should write a test for these errors. |
93 | 3 | let digest: DigestInfo = request |
94 | 3 | .action_digest |
95 | 3 | .clone() |
96 | 3 | .err_tip(|| "Action digest was not set in message"0 )?0 |
97 | 3 | .try_into()?0 ; |
98 | | |
99 | | // If we are a GrpcStore we shortcut here, as this is a special store. |
100 | 3 | if let Some(grpc_store0 ) = store_info Branch (100:16): [True: 0, False: 3]
Branch (100:16): [Folded - Ignored]
|
101 | 3 | .store |
102 | 3 | .downcast_ref::<GrpcStore>(Some(digest.into())) |
103 | | { |
104 | 0 | return grpc_store.get_action_result(Request::new(request)).await; |
105 | 3 | } |
106 | | |
107 | 3 | let res = get_and_decode_digest::<ActionResult>(&store_info.store, digest.into()).await; |
108 | 3 | match res { |
109 | 1 | Ok(action_result) => Ok(Response::new(action_result)), |
110 | 2 | Err(mut e) => { |
111 | 2 | if e.code == Code::NotFound { Branch (111:20): [True: 2, False: 0]
Branch (111:20): [Folded - Ignored]
|
112 | 2 | // `get_action_result` is frequent to get NotFound errors, so remove all |
113 | 2 | // messages to save space. |
114 | 2 | e.messages.clear(); |
115 | 2 | }0 |
116 | 2 | Err(e) |
117 | | } |
118 | | } |
119 | 3 | } |
120 | | |
121 | 1 | async fn inner_update_action_result( |
122 | 1 | &self, |
123 | 1 | request: UpdateActionResultRequest, |
124 | 1 | ) -> Result<Response<ActionResult>, Error> { |
125 | 1 | let instance_name = &request.instance_name; |
126 | 1 | let store_info = self |
127 | 1 | .stores |
128 | 1 | .get(instance_name) |
129 | 1 | .err_tip(|| format!("'instance_name' not configured for '{instance_name}'")0 )?0 ; |
130 | | |
131 | 1 | if store_info.read_only { Branch (131:12): [True: 0, False: 1]
Branch (131:12): [Folded - Ignored]
|
132 | 0 | return Err(make_err!( |
133 | 0 | Code::PermissionDenied, |
134 | 0 | "The store '{instance_name}' is read only on this endpoint", |
135 | 0 | )); |
136 | 1 | } |
137 | | |
138 | 1 | let digest: DigestInfo = request |
139 | 1 | .action_digest |
140 | 1 | .clone() |
141 | 1 | .err_tip(|| "Action digest was not set in message"0 )?0 |
142 | 1 | .try_into()?0 ; |
143 | | |
144 | | // If we are a GrpcStore we shortcut here, as this is a special store. |
145 | 1 | if let Some(grpc_store0 ) = store_info Branch (145:16): [True: 0, False: 1]
Branch (145:16): [Folded - Ignored]
|
146 | 1 | .store |
147 | 1 | .downcast_ref::<GrpcStore>(Some(digest.into())) |
148 | | { |
149 | 0 | return grpc_store.update_action_result(Request::new(request)).await; |
150 | 1 | } |
151 | | |
152 | 1 | let action_result = request |
153 | 1 | .action_result |
154 | 1 | .err_tip(|| "Action result was not set in message"0 )?0 ; |
155 | | |
156 | 1 | let mut store_data = BytesMut::with_capacity(ESTIMATED_DIGEST_SIZE); |
157 | 1 | action_result |
158 | 1 | .encode(&mut store_data) |
159 | 1 | .err_tip(|| "Provided ActionResult could not be serialized"0 )?0 ; |
160 | | |
161 | 1 | store_info |
162 | 1 | .store |
163 | 1 | .update_oneshot(digest, store_data.freeze()) |
164 | 1 | .await |
165 | 1 | .err_tip(|| "Failed to update in action cache"0 )?0 ; |
166 | 1 | Ok(Response::new(action_result)) |
167 | 1 | } |
168 | | } |
169 | | |
170 | | #[tonic::async_trait] |
171 | | impl ActionCache for AcServer { |
172 | | #[allow(clippy::blocks_in_conditions)] |
173 | | #[instrument( |
174 | | ret(level = Level::INFO), |
175 | | level = Level::ERROR, |
176 | | skip_all, |
177 | | fields(request = ?grpc_request.get_ref()) |
178 | | )] |
179 | | async fn get_action_result( |
180 | | &self, |
181 | | grpc_request: Request<GetActionResultRequest>, |
182 | 3 | ) -> Result<Response<ActionResult>, Status> { |
183 | 3 | let request = grpc_request.into_inner(); |
184 | 3 | let ctx = OriginEventContext::new(|| &request0 ).await; |
185 | | |
186 | 3 | let resp = make_ctx_for_hash_func(request.digest_function) |
187 | 3 | .err_tip(|| "In AcServer::get_action_result"0 )?0 |
188 | | .wrap_async( |
189 | 3 | error_span!("ac_server_get_action_result"), |
190 | 3 | self.inner_get_action_result(request), |
191 | 3 | ) |
192 | 3 | .await; |
193 | | |
194 | 3 | if resp.is_err() && resp.as_ref().err().unwrap().code != Code::NotFound2 { Branch (194:12): [True: 2, False: 1]
Branch (194:29): [True: 0, False: 2]
Branch (194:12): [Folded - Ignored]
Branch (194:29): [Folded - Ignored]
|
195 | 0 | event!(Level::ERROR, return = ?resp); |
196 | 3 | } |
197 | 3 | let resp = resp.map_err(Into::into); |
198 | 3 | ctx.emit(|| &resp0 ).await; |
199 | 3 | resp |
200 | 6 | } |
201 | | |
202 | | #[allow(clippy::blocks_in_conditions)] |
203 | | #[instrument( |
204 | | err, |
205 | | ret(level = Level::INFO), |
206 | | level = Level::ERROR, |
207 | | skip_all, |
208 | | fields(request = ?grpc_request.get_ref()) |
209 | | )] |
210 | | async fn update_action_result( |
211 | | &self, |
212 | | grpc_request: Request<UpdateActionResultRequest>, |
213 | 1 | ) -> Result<Response<ActionResult>, Status> { |
214 | 1 | let request = grpc_request.into_inner(); |
215 | 1 | let ctx = OriginEventContext::new(|| &request0 ).await; |
216 | 1 | let resp = make_ctx_for_hash_func(request.digest_function) |
217 | 1 | .err_tip(|| "In AcServer::update_action_result"0 )?0 |
218 | | .wrap_async( |
219 | 1 | error_span!("ac_server_update_action_result"), |
220 | 1 | self.inner_update_action_result(request), |
221 | 1 | ) |
222 | 1 | .await |
223 | 1 | .map_err(Into::into); |
224 | 1 | ctx.emit(|| &resp0 ).await; |
225 | 1 | resp |
226 | 2 | } |
227 | | } |