/build/source/nativelink-service/src/cas_server.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::convert::Into; |
16 | | use core::pin::Pin; |
17 | | use core::time::Duration; |
18 | | use std::collections::{HashMap, VecDeque}; |
19 | | |
20 | | use bytes::Bytes; |
21 | | use futures::stream::{FuturesUnordered, Stream}; |
22 | | use futures::{StreamExt, TryStreamExt}; |
23 | | use nativelink_config::cas_server::{CasStoreConfig, WithInstanceName}; |
24 | | use nativelink_error::{Code, Error, ResultExt, error_if, make_err, make_input_err}; |
25 | | use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_server::{ |
26 | | ContentAddressableStorage, ContentAddressableStorageServer as Server, |
27 | | }; |
28 | | use nativelink_proto::build::bazel::remote::execution::v2::{ |
29 | | BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest, |
30 | | BatchUpdateBlobsResponse, Directory, FindMissingBlobsRequest, FindMissingBlobsResponse, |
31 | | GetTreeRequest, GetTreeResponse, batch_read_blobs_response, batch_update_blobs_response, |
32 | | compressor, |
33 | | }; |
34 | | use nativelink_proto::google::rpc::Status as GrpcStatus; |
35 | | use nativelink_store::ac_utils::get_and_decode_digest; |
36 | | use nativelink_store::grpc_store::GrpcStore; |
37 | | use nativelink_store::store_manager::StoreManager; |
38 | | use nativelink_util::common::DigestInfo; |
39 | | use nativelink_util::digest_hasher::make_ctx_for_hash_func; |
40 | | use nativelink_util::store_trait::{Store, StoreLike}; |
41 | | use opentelemetry::context::FutureExt; |
42 | | use tonic::{Request, Response, Status}; |
43 | | use tracing::{Instrument, Level, debug, error_span, instrument}; |
44 | | |
45 | | #[derive(Debug)] |
46 | | pub struct CasServer { |
47 | | stores: HashMap<String, Store>, |
48 | | } |
49 | | |
50 | | type GetTreeStream = Pin<Box<dyn Stream<Item = Result<GetTreeResponse, Status>> + Send + 'static>>; |
51 | | |
52 | | /// Per-blob deadline applied inside `BatchReadBlobs` / `BatchUpdateBlobs`. |
53 | | const BATCH_PER_BLOB_TIMEOUT: Duration = Duration::from_secs(30); |
54 | | |
55 | | impl CasServer { |
56 | 10 | pub fn new( |
57 | 10 | configs: &[WithInstanceName<CasStoreConfig>], |
58 | 10 | store_manager: &StoreManager, |
59 | 10 | ) -> Result<Self, Error> { |
60 | 10 | let mut stores = HashMap::with_capacity(configs.len()); |
61 | 10 | for config in configs { |
62 | 10 | let store = store_manager.get_store(&config.cas_store).ok_or_else(|| {0 |
63 | 0 | make_input_err!("'cas_store': '{}' does not exist", config.cas_store) |
64 | 0 | })?; |
65 | 10 | stores.insert(config.instance_name.clone(), store); |
66 | | } |
67 | 10 | Ok(Self { stores }) |
68 | 10 | } |
69 | | |
70 | 0 | pub fn into_service(self) -> Server<Self> { |
71 | 0 | Server::new(self) |
72 | 0 | } |
73 | | |
74 | 4 | async fn inner_find_missing_blobs( |
75 | 4 | &self, |
76 | 4 | request: FindMissingBlobsRequest, |
77 | 4 | ) -> Result<Response<FindMissingBlobsResponse>, Error> { |
78 | 4 | let instance_name = &request.instance_name; |
79 | 4 | let store = self |
80 | 4 | .stores |
81 | 4 | .get(instance_name) |
82 | 4 | .err_tip(|| format!0 ("'instance_name' not configured for '{instance_name}'"))?0 |
83 | 4 | .clone(); |
84 | | |
85 | 4 | let mut requested_blobs = Vec::with_capacity(request.blob_digests.len()); |
86 | 7 | for digest in &request.blob_digests4 { |
87 | 7 | requested_blobs.push6 (DigestInfo::try_from(digest.clone())?1 .into6 ()); |
88 | | } |
89 | 3 | let sizes = store |
90 | 3 | .has_many(&requested_blobs) |
91 | 3 | .await |
92 | 3 | .err_tip(|| "In find_missing_blobs")?0 ; |
93 | 3 | let missing_blob_digests = sizes |
94 | 3 | .into_iter() |
95 | 3 | .zip(request.blob_digests) |
96 | 5 | .filter_map3 (|(maybe_size, digest)| maybe_size.map_or_else(|| Some(digest2 ), |_| None)) |
97 | 3 | .collect(); |
98 | | |
99 | 3 | Ok(Response::new(FindMissingBlobsResponse { |
100 | 3 | missing_blob_digests, |
101 | 3 | })) |
102 | 4 | } |
103 | | |
104 | 3 | async fn inner_batch_update_blobs( |
105 | 3 | &self, |
106 | 3 | request: BatchUpdateBlobsRequest, |
107 | 3 | ) -> Result<Response<BatchUpdateBlobsResponse>, Error> { |
108 | 3 | let instance_name = &request.instance_name; |
109 | | |
110 | 3 | let store = self |
111 | 3 | .stores |
112 | 3 | .get(instance_name) |
113 | 3 | .err_tip(|| format!0 ("'instance_name' not configured for '{instance_name}'"))?0 |
114 | 3 | .clone(); |
115 | | |
116 | | // If we are a GrpcStore we shortcut here, as this is a special store. |
117 | | // Note: We don't know the digests here, so we try perform a very shallow |
118 | | // check to see if it's a grpc store. |
119 | 3 | if let Some(grpc_store0 ) = store.downcast_ref::<GrpcStore>(None) { |
120 | 0 | return grpc_store.batch_update_blobs(Request::new(request)).await; |
121 | 3 | } |
122 | | |
123 | 3 | let store_ref = &store; |
124 | 3 | let update_futures: FuturesUnordered<_> = request |
125 | 3 | .requests |
126 | 3 | .into_iter() |
127 | 4 | .map3 (|request| async move { |
128 | 4 | let digest = request |
129 | 4 | .digest |
130 | 4 | .clone() |
131 | 4 | .err_tip(|| "Digest not found in request")?0 ; |
132 | 4 | let request_data = request.data; |
133 | 4 | let digest_info = DigestInfo::try_from(digest.clone())?0 ; |
134 | 4 | let size_bytes = usize::try_from(digest_info.size_bytes()) |
135 | 4 | .err_tip(|| "Digest size_bytes was not convertible to usize")?0 ; |
136 | 0 | error_if!( |
137 | 4 | size_bytes != request_data.len(), |
138 | | "Digest for upload had mismatching sizes, digest said {} data said {}", |
139 | | size_bytes, |
140 | 0 | request_data.len() |
141 | | ); |
142 | | // Apply a per-blob deadline so one slow upload does not |
143 | | // make the whole batch hit the client's overall deadline. |
144 | 4 | let result = match tokio::time::timeout( |
145 | | BATCH_PER_BLOB_TIMEOUT, |
146 | 4 | store_ref.update_oneshot(digest_info, request_data), |
147 | | ) |
148 | 4 | .await |
149 | | { |
150 | 3 | Ok(r) => r.err_tip(|| "Error writing to store"), |
151 | 1 | Err(_elapsed) => Err(make_err!( |
152 | 1 | Code::DeadlineExceeded, |
153 | 1 | "BatchUpdateBlobs per-blob timeout ({} s) elapsed for digest {}", |
154 | 1 | BATCH_PER_BLOB_TIMEOUT.as_secs(), |
155 | 1 | digest_info, |
156 | 1 | )), |
157 | | }; |
158 | | Ok::<_, Error>(batch_update_blobs_response::Response { |
159 | 4 | digest: Some(digest), |
160 | 4 | status: Some(result.map_or_else(Into::into, |()| GrpcStatus::default3 ())), |
161 | | }) |
162 | 8 | }) |
163 | 3 | .collect(); |
164 | 3 | let responses = update_futures |
165 | 3 | .try_collect::<Vec<batch_update_blobs_response::Response>>() |
166 | 3 | .await?0 ; |
167 | | |
168 | 3 | Ok(Response::new(BatchUpdateBlobsResponse { responses })) |
169 | 3 | } |
170 | | |
171 | 2 | async fn inner_batch_read_blobs( |
172 | 2 | &self, |
173 | 2 | request: BatchReadBlobsRequest, |
174 | 2 | ) -> Result<Response<BatchReadBlobsResponse>, Error> { |
175 | 2 | let instance_name = &request.instance_name; |
176 | | |
177 | 2 | let store = self |
178 | 2 | .stores |
179 | 2 | .get(instance_name) |
180 | 2 | .err_tip(|| format!0 ("'instance_name' not configured for '{instance_name}'"))?0 |
181 | 2 | .clone(); |
182 | | |
183 | | // If we are a GrpcStore we shortcut here, as this is a special store. |
184 | | // Note: We don't know the digests here, so we try perform a very shallow |
185 | | // check to see if it's a grpc store. |
186 | 2 | if let Some(grpc_store0 ) = store.downcast_ref::<GrpcStore>(None) { |
187 | 0 | return grpc_store.batch_read_blobs(Request::new(request)).await; |
188 | 2 | } |
189 | | |
190 | 2 | let store_ref = &store; |
191 | 2 | let read_futures: FuturesUnordered<_> = request |
192 | 2 | .digests |
193 | 2 | .into_iter() |
194 | 4 | .map2 (|digest| async move { |
195 | 4 | let digest_copy = DigestInfo::try_from(digest.clone())?0 ; |
196 | | // TODO(palfrey) There is a security risk here of someone taking all the memory on the instance. |
197 | | // Apply a per-blob deadline so one slow read does not |
198 | | // make the whole batch hit the client's overall deadline. |
199 | 4 | let result = match tokio::time::timeout( |
200 | | BATCH_PER_BLOB_TIMEOUT, |
201 | 4 | store_ref.get_part_unchunked(digest_copy, 0, None), |
202 | | ) |
203 | 4 | .await |
204 | | { |
205 | 3 | Ok(r) => r.err_tip(|| "Error reading from store"), |
206 | 1 | Err(_elapsed) => Err(make_err!( |
207 | 1 | Code::DeadlineExceeded, |
208 | 1 | "BatchReadBlobs per-blob timeout ({} s) elapsed for digest {}", |
209 | 1 | BATCH_PER_BLOB_TIMEOUT.as_secs(), |
210 | 1 | digest_copy, |
211 | 1 | )), |
212 | | }; |
213 | 4 | let (status, data) = result.map_or_else( |
214 | 2 | |mut e| { |
215 | 2 | if e.code == Code::NotFound { |
216 | 1 | // Trim the error code. Not Found is quite common and we don't want to send a large |
217 | 1 | // error (debug) message for something that is common. We resize to just the last |
218 | 1 | // message as it will be the most relevant. |
219 | 1 | e.messages.resize_with(1, String::new); |
220 | 1 | } |
221 | 2 | (e.into(), Bytes::new()) |
222 | 2 | }, |
223 | 2 | |v| (GrpcStatus::default(), v), |
224 | | ); |
225 | 4 | Ok::<_, Error>(batch_read_blobs_response::Response { |
226 | 4 | status: Some(status), |
227 | 4 | digest: Some(digest), |
228 | 4 | compressor: compressor::Value::Identity.into(), |
229 | 4 | data, |
230 | 4 | }) |
231 | 8 | }) |
232 | 2 | .collect(); |
233 | 2 | let responses = read_futures |
234 | 2 | .try_collect::<Vec<batch_read_blobs_response::Response>>() |
235 | 2 | .await?0 ; |
236 | | |
237 | 2 | Ok(Response::new(BatchReadBlobsResponse { responses })) |
238 | 2 | } |
239 | | |
240 | 6 | async fn inner_get_tree( |
241 | 6 | &self, |
242 | 6 | request: GetTreeRequest, |
243 | 6 | ) -> Result<impl Stream<Item = Result<GetTreeResponse, Status>> + Send + use<>, Error> { |
244 | 6 | let instance_name = &request.instance_name; |
245 | | |
246 | 6 | let store = self |
247 | 6 | .stores |
248 | 6 | .get(instance_name) |
249 | 6 | .err_tip(|| format!0 ("'instance_name' not configured for '{instance_name}'"))?0 |
250 | 6 | .clone(); |
251 | | |
252 | | // If we are a GrpcStore we shortcut here, as this is a special store. |
253 | | // Note: We don't know the digests here, so we try perform a very shallow |
254 | | // check to see if it's a grpc store. |
255 | 6 | if let Some(grpc_store0 ) = store.downcast_ref::<GrpcStore>(None) { |
256 | 0 | let stream = grpc_store |
257 | 0 | .get_tree(Request::new(request)) |
258 | 0 | .await? |
259 | 0 | .into_inner(); |
260 | 0 | return Ok(stream.left_stream()); |
261 | 6 | } |
262 | 6 | let root_digest: DigestInfo = request |
263 | 6 | .root_digest |
264 | 6 | .err_tip(|| "Expected root_digest to exist in GetTreeRequest")?0 |
265 | 6 | .try_into() |
266 | 6 | .err_tip(|| "In GetTreeRequest::root_digest")?0 ; |
267 | | |
268 | 6 | let mut deque: VecDeque<DigestInfo> = VecDeque::new(); |
269 | 6 | let mut directories: Vec<Directory> = Vec::new(); |
270 | | // `page_token` will return the `{hash_str}-{size_bytes}` of the current request's first directory digest. |
271 | 6 | let page_token_digest = if request.page_token.is_empty() { |
272 | 2 | root_digest |
273 | | } else { |
274 | 4 | let mut page_token_parts = request.page_token.split('-'); |
275 | 4 | DigestInfo::try_new( |
276 | 4 | page_token_parts |
277 | 4 | .next() |
278 | 4 | .err_tip(|| "Failed to parse `hash_str` in `page_token`")?0 , |
279 | 4 | page_token_parts |
280 | 4 | .next() |
281 | 4 | .err_tip(|| "Failed to parse `size_bytes` in `page_token`")?0 |
282 | 4 | .parse::<i64>() |
283 | 4 | .err_tip(|| "Failed to parse `size_bytes` as i64")?0 , |
284 | | ) |
285 | 4 | .err_tip(|| "Failed to parse `page_token` as `Digest` in `GetTreeRequest`")?0 |
286 | | }; |
287 | 6 | let page_size = request.page_size; |
288 | | // If `page_size` is 0, paging is not necessary. |
289 | 6 | let mut page_token_matched = page_size == 0; |
290 | 6 | deque.push_back(root_digest); |
291 | | |
292 | 28 | while !deque.is_empty() { |
293 | 26 | let digest: DigestInfo = deque.pop_front().err_tip(|| "In VecDeque::pop_front")?0 ; |
294 | 26 | let directory = get_and_decode_digest::<Directory>(&store, digest.into()) |
295 | 26 | .await |
296 | 26 | .err_tip(|| "Converting digest to Directory")?0 ; |
297 | 26 | if digest == page_token_digest { |
298 | 6 | page_token_matched = true; |
299 | 20 | } |
300 | 30 | for directory in &directory.directories26 { |
301 | 30 | let digest: DigestInfo = directory |
302 | 30 | .digest |
303 | 30 | .clone() |
304 | 30 | .err_tip(|| "Expected Digest to exist in Directory::directories::digest")?0 |
305 | 30 | .try_into() |
306 | 30 | .err_tip(|| "In Directory::file::digest")?0 ; |
307 | 30 | deque.push_back(digest); |
308 | | } |
309 | | |
310 | 26 | let page_size_usize = usize::try_from(page_size).unwrap_or(usize::MAX); |
311 | | |
312 | 26 | if page_token_matched { |
313 | 20 | directories.push(directory); |
314 | 20 | if directories.len() == page_size_usize { |
315 | 4 | break; |
316 | 16 | } |
317 | 6 | } |
318 | | } |
319 | | // `next_page_token` will return the `{hash_str}:{size_bytes}` of the next request's first directory digest. |
320 | | // It will be an empty string when it reached the end of the directory tree. |
321 | 6 | let next_page_token: String = deque |
322 | 6 | .front() |
323 | 6 | .map_or_else(String::new, |value| format!3 ("{value}")); |
324 | | |
325 | 6 | Ok(futures::stream::once(async { |
326 | 6 | Ok(GetTreeResponse { |
327 | 6 | directories, |
328 | 6 | next_page_token, |
329 | 6 | }) |
330 | 6 | }) |
331 | 6 | .right_stream()) |
332 | 6 | } |
333 | | } |
334 | | |
335 | | #[tonic::async_trait] |
336 | | impl ContentAddressableStorage for CasServer { |
337 | | type GetTreeStream = GetTreeStream; |
338 | | |
339 | | #[instrument( |
340 | | err, |
341 | | ret(level = Level::DEBUG), |
342 | | level = Level::ERROR, |
343 | | skip_all, |
344 | | fields( |
345 | | // Mostly to skip request.blob_digests which is sometimes enormous |
346 | | request.instance_name = ?grpc_request.get_ref().instance_name, |
347 | | request.digest_function = ?grpc_request.get_ref().digest_function |
348 | | ) |
349 | | )] |
350 | | async fn find_missing_blobs( |
351 | | &self, |
352 | | grpc_request: Request<FindMissingBlobsRequest>, |
353 | | ) -> Result<Response<FindMissingBlobsResponse>, Status> { |
354 | | let request = grpc_request.into_inner(); |
355 | | let digest_function = request.digest_function; |
356 | | self.inner_find_missing_blobs(request) |
357 | | .instrument(error_span!("cas_server_find_missing_blobs")) |
358 | | .with_context( |
359 | | make_ctx_for_hash_func(digest_function) |
360 | | .err_tip(|| "In CasServer::find_missing_blobs")?, |
361 | | ) |
362 | | .await |
363 | | .err_tip(|| "Failed on find_missing_blobs() command") |
364 | | .map_err(Into::into) |
365 | | } |
366 | | |
367 | | #[instrument( |
368 | | err, |
369 | | ret(level = Level::DEBUG), |
370 | | level = Level::ERROR, |
371 | | skip_all, |
372 | | fields(request = ?grpc_request.get_ref()) |
373 | | )] |
374 | | async fn batch_update_blobs( |
375 | | &self, |
376 | | grpc_request: Request<BatchUpdateBlobsRequest>, |
377 | | ) -> Result<Response<BatchUpdateBlobsResponse>, Status> { |
378 | | let request = grpc_request.into_inner(); |
379 | | let digest_function = request.digest_function; |
380 | | |
381 | | self.inner_batch_update_blobs(request) |
382 | | .instrument(error_span!("cas_server_batch_update_blobs")) |
383 | | .with_context( |
384 | | make_ctx_for_hash_func(digest_function) |
385 | | .err_tip(|| "In CasServer::batch_update_blobs")?, |
386 | | ) |
387 | | .await |
388 | | .err_tip(|| "Failed on batch_update_blobs() command") |
389 | | .map_err(Into::into) |
390 | | } |
391 | | |
392 | | #[instrument( |
393 | | err, |
394 | | ret(level = Level::INFO), |
395 | | level = Level::ERROR, |
396 | | skip_all, |
397 | | fields(request = ?grpc_request.get_ref()) |
398 | | )] |
399 | | async fn batch_read_blobs( |
400 | | &self, |
401 | | grpc_request: Request<BatchReadBlobsRequest>, |
402 | | ) -> Result<Response<BatchReadBlobsResponse>, Status> { |
403 | | let request = grpc_request.into_inner(); |
404 | | let digest_function = request.digest_function; |
405 | | |
406 | | self.inner_batch_read_blobs(request) |
407 | | .instrument(error_span!("cas_server_batch_read_blobs")) |
408 | | .with_context( |
409 | | make_ctx_for_hash_func(digest_function) |
410 | | .err_tip(|| "In CasServer::batch_read_blobs")?, |
411 | | ) |
412 | | .await |
413 | | .err_tip(|| "Failed on batch_read_blobs() command") |
414 | | .map_err(Into::into) |
415 | | } |
416 | | |
417 | | #[instrument( |
418 | | err, |
419 | | level = Level::ERROR, |
420 | | skip_all, |
421 | | fields(request = ?grpc_request.get_ref()) |
422 | | )] |
423 | | async fn get_tree( |
424 | | &self, |
425 | | grpc_request: Request<GetTreeRequest>, |
426 | | ) -> Result<Response<Self::GetTreeStream>, Status> { |
427 | | let request = grpc_request.into_inner(); |
428 | | let digest_function = request.digest_function; |
429 | | |
430 | | let resp = self |
431 | | .inner_get_tree(request) |
432 | | .instrument(error_span!("cas_server_get_tree")) |
433 | | .with_context( |
434 | | make_ctx_for_hash_func(digest_function).err_tip(|| "In CasServer::get_tree")?, |
435 | | ) |
436 | | .await |
437 | | .err_tip(|| "Failed on get_tree() command") |
438 | 6 | .map(|stream| -> Response<Self::GetTreeStream> { Response::new(Box::pin(stream)) }) |
439 | | .map_err(Into::into); |
440 | | |
441 | | if resp.is_ok() { |
442 | | debug!(return = "Ok(<stream>)"); |
443 | | } |
444 | | resp |
445 | | } |
446 | | } |