/build/source/nativelink-store/src/grpc_store.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::borrow::Cow; |
16 | | use std::pin::Pin; |
17 | | use std::sync::Arc; |
18 | | use std::time::Duration; |
19 | | |
20 | | use async_trait::async_trait; |
21 | | use bytes::BytesMut; |
22 | | use futures::stream::{FuturesUnordered, unfold}; |
23 | | use futures::{Future, Stream, StreamExt, TryFutureExt, TryStreamExt, future}; |
24 | | use nativelink_config::stores::GrpcSpec; |
25 | | use nativelink_error::{Error, ResultExt, error_if, make_input_err}; |
26 | | use nativelink_metric::MetricsComponent; |
27 | | use nativelink_proto::build::bazel::remote::execution::v2::action_cache_client::ActionCacheClient; |
28 | | use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_client::ContentAddressableStorageClient; |
29 | | use nativelink_proto::build::bazel::remote::execution::v2::{ |
30 | | ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest, |
31 | | BatchUpdateBlobsResponse, FindMissingBlobsRequest, FindMissingBlobsResponse, |
32 | | GetActionResultRequest, GetTreeRequest, GetTreeResponse, UpdateActionResultRequest, |
33 | | }; |
34 | | use nativelink_proto::google::bytestream::byte_stream_client::ByteStreamClient; |
35 | | use nativelink_proto::google::bytestream::{ |
36 | | QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest, |
37 | | WriteResponse, |
38 | | }; |
39 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
40 | | use nativelink_util::common::DigestInfo; |
41 | | use nativelink_util::connection_manager::ConnectionManager; |
42 | | use nativelink_util::digest_hasher::{ACTIVE_HASHER_FUNC, default_digest_hasher_func}; |
43 | | use nativelink_util::health_utils::HealthStatusIndicator; |
44 | | use nativelink_util::origin_context::ActiveOriginContext; |
45 | | use nativelink_util::proto_stream_utils::{ |
46 | | FirstStream, WriteRequestStreamWrapper, WriteState, WriteStateWrapper, |
47 | | }; |
48 | | use nativelink_util::resource_info::ResourceInfo; |
49 | | use nativelink_util::retry::{Retrier, RetryResult}; |
50 | | use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo}; |
51 | | use nativelink_util::{default_health_status_indicator, tls_utils}; |
52 | | use parking_lot::Mutex; |
53 | | use prost::Message; |
54 | | use rand::Rng; |
55 | | use tokio::time::sleep; |
56 | | use tonic::{IntoRequest, Request, Response, Status, Streaming}; |
57 | | use tracing::{Level, event}; |
58 | | use uuid::Uuid; |
59 | | |
60 | | // This store is usually a pass-through store, but can also be used as a CAS store. Using it as an |
61 | | // AC store has one major side-effect... The has() function may not give the proper size of the |
62 | | // underlying data. This might cause issues if embedded in certain stores. |
63 | | #[derive(Debug, MetricsComponent)] |
64 | | pub struct GrpcStore { |
65 | | #[metric(help = "Instance name for the store")] |
66 | | instance_name: String, |
67 | | store_type: nativelink_config::stores::StoreType, |
68 | | retrier: Retrier, |
69 | | connection_manager: ConnectionManager, |
70 | | } |
71 | | |
72 | | impl GrpcStore { |
73 | 0 | pub async fn new(spec: &GrpcSpec) -> Result<Arc<Self>, Error> { |
74 | 0 | let jitter_amt = spec.retry.jitter; |
75 | 0 | Self::new_with_jitter( |
76 | 0 | spec, |
77 | 0 | Box::new(move |delay: Duration| { |
78 | 0 | if jitter_amt == 0. { Branch (78:20): [True: 0, False: 0]
Branch (78:20): [Folded - Ignored]
|
79 | 0 | return delay; |
80 | 0 | } |
81 | 0 | let min = 1. - (jitter_amt / 2.); |
82 | 0 | let max = 1. + (jitter_amt / 2.); |
83 | 0 | delay.mul_f32(rand::rng().random_range(min..max)) |
84 | 0 | }), |
85 | | ) |
86 | 0 | .await |
87 | 0 | } |
88 | | |
89 | 0 | pub async fn new_with_jitter( |
90 | 0 | spec: &GrpcSpec, |
91 | 0 | jitter_fn: Box<dyn Fn(Duration) -> Duration + Send + Sync>, |
92 | 0 | ) -> Result<Arc<Self>, Error> { |
93 | 0 | error_if!( |
94 | 0 | spec.endpoints.is_empty(), Branch (94:13): [True: 0, False: 0]
Branch (94:13): [Folded - Ignored]
|
95 | | "Expected at least 1 endpoint in GrpcStore" |
96 | | ); |
97 | 0 | let mut endpoints = Vec::with_capacity(spec.endpoints.len()); |
98 | 0 | for endpoint_config in &spec.endpoints { |
99 | 0 | let endpoint = tls_utils::endpoint(endpoint_config) |
100 | 0 | .map_err(|e| make_input_err!("Invalid URI for GrpcStore endpoint : {e:?}"))?; |
101 | 0 | endpoints.push(endpoint); |
102 | | } |
103 | | |
104 | 0 | let jitter_fn = Arc::new(jitter_fn); |
105 | 0 | Ok(Arc::new(GrpcStore { |
106 | 0 | instance_name: spec.instance_name.clone(), |
107 | 0 | store_type: spec.store_type, |
108 | 0 | retrier: Retrier::new( |
109 | 0 | Arc::new(|duration| Box::pin(sleep(duration))), |
110 | 0 | jitter_fn.clone(), |
111 | 0 | spec.retry.clone(), |
112 | 0 | ), |
113 | 0 | connection_manager: ConnectionManager::new( |
114 | 0 | endpoints.into_iter(), |
115 | 0 | spec.connections_per_endpoint, |
116 | 0 | spec.max_concurrent_requests, |
117 | 0 | spec.retry.clone(), |
118 | 0 | jitter_fn, |
119 | | ), |
120 | | })) |
121 | 0 | } |
122 | | |
123 | 0 | async fn perform_request<F, Fut, R, I>(&self, input: I, mut request: F) -> Result<R, Error> |
124 | 0 | where |
125 | 0 | F: FnMut(I) -> Fut + Send + Copy, |
126 | 0 | Fut: Future<Output = Result<R, Error>> + Send, |
127 | 0 | R: Send, |
128 | 0 | I: Send + Clone, |
129 | 0 | { |
130 | 0 | self.retrier |
131 | 0 | .retry(unfold(input, move |input| async move { |
132 | 0 | let input_clone = input.clone(); |
133 | 0 | Some(( |
134 | 0 | request(input_clone) |
135 | 0 | .await |
136 | 0 | .map_or_else(RetryResult::Retry, RetryResult::Ok), |
137 | 0 | input, |
138 | 0 | )) |
139 | 0 | })) |
140 | 0 | .await |
141 | 0 | } |
142 | | |
143 | 0 | pub async fn find_missing_blobs( |
144 | 0 | &self, |
145 | 0 | grpc_request: Request<FindMissingBlobsRequest>, |
146 | 0 | ) -> Result<Response<FindMissingBlobsResponse>, Error> { |
147 | 0 | error_if!( |
148 | 0 | matches!(self.store_type, nativelink_config::stores::StoreType::Ac), |
149 | | "CAS operation on AC store" |
150 | | ); |
151 | | |
152 | 0 | let mut request = grpc_request.into_inner(); |
153 | 0 | request.instance_name.clone_from(&self.instance_name); |
154 | 0 | self.perform_request(request, |request| async move { |
155 | 0 | let channel = self |
156 | 0 | .connection_manager |
157 | 0 | .connection() |
158 | 0 | .await |
159 | 0 | .err_tip(|| "in find_missing_blobs")?; |
160 | 0 | ContentAddressableStorageClient::new(channel) |
161 | 0 | .find_missing_blobs(Request::new(request)) |
162 | 0 | .await |
163 | 0 | .err_tip(|| "in GrpcStore::find_missing_blobs") |
164 | 0 | }) |
165 | 0 | .await |
166 | 0 | } |
167 | | |
168 | 0 | pub async fn batch_update_blobs( |
169 | 0 | &self, |
170 | 0 | grpc_request: Request<BatchUpdateBlobsRequest>, |
171 | 0 | ) -> Result<Response<BatchUpdateBlobsResponse>, Error> { |
172 | 0 | error_if!( |
173 | 0 | matches!(self.store_type, nativelink_config::stores::StoreType::Ac), |
174 | | "CAS operation on AC store" |
175 | | ); |
176 | | |
177 | 0 | let mut request = grpc_request.into_inner(); |
178 | 0 | request.instance_name.clone_from(&self.instance_name); |
179 | 0 | self.perform_request(request, |request| async move { |
180 | 0 | let channel = self |
181 | 0 | .connection_manager |
182 | 0 | .connection() |
183 | 0 | .await |
184 | 0 | .err_tip(|| "in batch_update_blobs")?; |
185 | 0 | ContentAddressableStorageClient::new(channel) |
186 | 0 | .batch_update_blobs(Request::new(request)) |
187 | 0 | .await |
188 | 0 | .err_tip(|| "in GrpcStore::batch_update_blobs") |
189 | 0 | }) |
190 | 0 | .await |
191 | 0 | } |
192 | | |
193 | 0 | pub async fn batch_read_blobs( |
194 | 0 | &self, |
195 | 0 | grpc_request: Request<BatchReadBlobsRequest>, |
196 | 0 | ) -> Result<Response<BatchReadBlobsResponse>, Error> { |
197 | 0 | error_if!( |
198 | 0 | matches!(self.store_type, nativelink_config::stores::StoreType::Ac), |
199 | | "CAS operation on AC store" |
200 | | ); |
201 | | |
202 | 0 | let mut request = grpc_request.into_inner(); |
203 | 0 | request.instance_name.clone_from(&self.instance_name); |
204 | 0 | self.perform_request(request, |request| async move { |
205 | 0 | let channel = self |
206 | 0 | .connection_manager |
207 | 0 | .connection() |
208 | 0 | .await |
209 | 0 | .err_tip(|| "in batch_read_blobs")?; |
210 | 0 | ContentAddressableStorageClient::new(channel) |
211 | 0 | .batch_read_blobs(Request::new(request)) |
212 | 0 | .await |
213 | 0 | .err_tip(|| "in GrpcStore::batch_read_blobs") |
214 | 0 | }) |
215 | 0 | .await |
216 | 0 | } |
217 | | |
218 | 0 | pub async fn get_tree( |
219 | 0 | &self, |
220 | 0 | grpc_request: Request<GetTreeRequest>, |
221 | 0 | ) -> Result<Response<Streaming<GetTreeResponse>>, Error> { |
222 | 0 | error_if!( |
223 | 0 | matches!(self.store_type, nativelink_config::stores::StoreType::Ac), |
224 | | "CAS operation on AC store" |
225 | | ); |
226 | | |
227 | 0 | let mut request = grpc_request.into_inner(); |
228 | 0 | request.instance_name.clone_from(&self.instance_name); |
229 | 0 | self.perform_request(request, |request| async move { |
230 | 0 | let channel = self |
231 | 0 | .connection_manager |
232 | 0 | .connection() |
233 | 0 | .await |
234 | 0 | .err_tip(|| "in get_tree")?; |
235 | 0 | ContentAddressableStorageClient::new(channel) |
236 | 0 | .get_tree(Request::new(request)) |
237 | 0 | .await |
238 | 0 | .err_tip(|| "in GrpcStore::get_tree") |
239 | 0 | }) |
240 | 0 | .await |
241 | 0 | } |
242 | | |
243 | 0 | fn get_read_request(&self, mut request: ReadRequest) -> Result<ReadRequest, Error> { |
244 | | const IS_UPLOAD_FALSE: bool = false; |
245 | 0 | let mut resource_info = ResourceInfo::new(&request.resource_name, IS_UPLOAD_FALSE)?; |
246 | 0 | if resource_info.instance_name != self.instance_name { Branch (246:12): [True: 0, False: 0]
Branch (246:12): [Folded - Ignored]
|
247 | 0 | resource_info.instance_name = Cow::Borrowed(&self.instance_name); |
248 | 0 | request.resource_name = resource_info.to_string(IS_UPLOAD_FALSE); |
249 | 0 | } |
250 | 0 | Ok(request) |
251 | 0 | } |
252 | | |
253 | 0 | async fn read_internal( |
254 | 0 | &self, |
255 | 0 | request: ReadRequest, |
256 | 0 | ) -> Result<impl Stream<Item = Result<ReadResponse, Status>> + use<>, Error> { |
257 | 0 | let channel = self |
258 | 0 | .connection_manager |
259 | 0 | .connection() |
260 | 0 | .await |
261 | 0 | .err_tip(|| "in read_internal")?; |
262 | 0 | let mut response = ByteStreamClient::new(channel) |
263 | 0 | .read(Request::new(request)) |
264 | 0 | .await |
265 | 0 | .err_tip(|| "in GrpcStore::read")? |
266 | 0 | .into_inner(); |
267 | 0 | let first_response = response |
268 | 0 | .message() |
269 | 0 | .await |
270 | 0 | .err_tip(|| "Fetching first chunk in GrpcStore::read()")?; |
271 | 0 | Ok(FirstStream::new(first_response, response)) |
272 | 0 | } |
273 | | |
274 | 0 | pub async fn read<R>( |
275 | 0 | &self, |
276 | 0 | grpc_request: R, |
277 | 0 | ) -> Result<impl Stream<Item = Result<ReadResponse, Status>> + use<R>, Error> |
278 | 0 | where |
279 | 0 | R: IntoRequest<ReadRequest>, |
280 | 0 | { |
281 | 0 | error_if!( |
282 | 0 | matches!(self.store_type, nativelink_config::stores::StoreType::Ac), |
283 | | "CAS operation on AC store" |
284 | | ); |
285 | | |
286 | 0 | let request = self.get_read_request(grpc_request.into_request().into_inner())?; |
287 | 0 | self.perform_request(request, |request| async move { |
288 | 0 | self.read_internal(request).await |
289 | 0 | }) |
290 | 0 | .await |
291 | 0 | } |
292 | | |
293 | 0 | pub async fn write<T, E>( |
294 | 0 | &self, |
295 | 0 | stream: WriteRequestStreamWrapper<T>, |
296 | 0 | ) -> Result<Response<WriteResponse>, Error> |
297 | 0 | where |
298 | 0 | T: Stream<Item = Result<WriteRequest, E>> + Unpin + Send + 'static, |
299 | 0 | E: Into<Error> + 'static, |
300 | 0 | { |
301 | 0 | error_if!( |
302 | 0 | matches!(self.store_type, nativelink_config::stores::StoreType::Ac), |
303 | | "CAS operation on AC store" |
304 | | ); |
305 | | |
306 | 0 | let local_state = Arc::new(Mutex::new(WriteState::new( |
307 | 0 | self.instance_name.clone(), |
308 | 0 | stream, |
309 | | ))); |
310 | | |
311 | 0 | let result = self |
312 | 0 | .retrier |
313 | 0 | .retry(unfold(local_state, move |local_state| async move { |
314 | | // The client write may occur on a separate thread and |
315 | | // therefore in order to share the state with it we have to |
316 | | // wrap it in a Mutex and retrieve it after the write |
317 | | // has completed. There is no way to get the value back |
318 | | // from the client. |
319 | 0 | let result = self |
320 | 0 | .connection_manager |
321 | 0 | .connection() |
322 | 0 | .and_then(|channel| async { |
323 | 0 | ByteStreamClient::new(channel) |
324 | 0 | .write(WriteStateWrapper::new(local_state.clone())) |
325 | 0 | .await |
326 | 0 | .err_tip(|| "in GrpcStore::write") |
327 | 0 | }) |
328 | 0 | .await; |
329 | | |
330 | | // Get the state back from StateWrapper, this should be |
331 | | // uncontended since write has returned. |
332 | 0 | let mut local_state_locked = local_state.lock(); |
333 | | |
334 | 0 | let result = if let Some(err) = local_state_locked.take_read_stream_error() { Branch (334:37): [True: 0, False: 0]
Branch (334:37): [True: 0, False: 0]
Branch (334:37): [Folded - Ignored]
|
335 | | // If there was an error with the stream, then don't retry. |
336 | 0 | RetryResult::Err(err.append("Where read_stream_error was set")) |
337 | | } else { |
338 | | // On error determine whether it is possible to retry. |
339 | 0 | match result { |
340 | 0 | Err(err) => { |
341 | 0 | if local_state_locked.can_resume() { Branch (341:32): [True: 0, False: 0]
Branch (341:32): [True: 0, False: 0]
Branch (341:32): [Folded - Ignored]
|
342 | 0 | local_state_locked.resume(); |
343 | 0 | RetryResult::Retry(err) |
344 | | } else { |
345 | 0 | RetryResult::Err(err.append("Retry is not possible")) |
346 | | } |
347 | | } |
348 | 0 | Ok(response) => RetryResult::Ok(response), |
349 | | } |
350 | | }; |
351 | | |
352 | 0 | drop(local_state_locked); |
353 | 0 | Some((result, local_state)) |
354 | 0 | })) |
355 | 0 | .await?; |
356 | 0 | Ok(result) |
357 | 0 | } |
358 | | |
359 | 0 | pub async fn query_write_status( |
360 | 0 | &self, |
361 | 0 | grpc_request: Request<QueryWriteStatusRequest>, |
362 | 0 | ) -> Result<Response<QueryWriteStatusResponse>, Error> { |
363 | | const IS_UPLOAD_TRUE: bool = true; |
364 | | |
365 | 0 | error_if!( |
366 | 0 | matches!(self.store_type, nativelink_config::stores::StoreType::Ac), |
367 | | "CAS operation on AC store" |
368 | | ); |
369 | | |
370 | 0 | let mut request = grpc_request.into_inner(); |
371 | | |
372 | 0 | let mut request_info = ResourceInfo::new(&request.resource_name, IS_UPLOAD_TRUE)?; |
373 | 0 | if request_info.instance_name != self.instance_name { Branch (373:12): [True: 0, False: 0]
Branch (373:12): [Folded - Ignored]
Branch (373:12): [Folded - Ignored]
|
374 | 0 | request_info.instance_name = Cow::Borrowed(&self.instance_name); |
375 | 0 | request.resource_name = request_info.to_string(IS_UPLOAD_TRUE); |
376 | 0 | } |
377 | | |
378 | 0 | self.perform_request(request, |request| async move { |
379 | 0 | let channel = self |
380 | 0 | .connection_manager |
381 | 0 | .connection() |
382 | 0 | .await |
383 | 0 | .err_tip(|| "in query_write_status")?; |
384 | 0 | ByteStreamClient::new(channel) |
385 | 0 | .query_write_status(Request::new(request)) |
386 | 0 | .await |
387 | 0 | .err_tip(|| "in GrpcStore::query_write_status") |
388 | 0 | }) |
389 | 0 | .await |
390 | 0 | } |
391 | | |
392 | 0 | pub async fn get_action_result( |
393 | 0 | &self, |
394 | 0 | grpc_request: Request<GetActionResultRequest>, |
395 | 0 | ) -> Result<Response<ActionResult>, Error> { |
396 | 0 | let mut request = grpc_request.into_inner(); |
397 | 0 | request.instance_name.clone_from(&self.instance_name); |
398 | 0 | self.perform_request(request, |request| async move { |
399 | 0 | let channel = self |
400 | 0 | .connection_manager |
401 | 0 | .connection() |
402 | 0 | .await |
403 | 0 | .err_tip(|| "in get_action_result")?; |
404 | 0 | ActionCacheClient::new(channel) |
405 | 0 | .get_action_result(Request::new(request)) |
406 | 0 | .await |
407 | 0 | .err_tip(|| "in GrpcStore::get_action_result") |
408 | 0 | }) |
409 | 0 | .await |
410 | 0 | } |
411 | | |
412 | 0 | pub async fn update_action_result( |
413 | 0 | &self, |
414 | 0 | grpc_request: Request<UpdateActionResultRequest>, |
415 | 0 | ) -> Result<Response<ActionResult>, Error> { |
416 | 0 | let mut request = grpc_request.into_inner(); |
417 | 0 | request.instance_name.clone_from(&self.instance_name); |
418 | 0 | self.perform_request(request, |request| async move { |
419 | 0 | let channel = self |
420 | 0 | .connection_manager |
421 | 0 | .connection() |
422 | 0 | .await |
423 | 0 | .err_tip(|| "in update_action_result")?; |
424 | 0 | ActionCacheClient::new(channel) |
425 | 0 | .update_action_result(Request::new(request)) |
426 | 0 | .await |
427 | 0 | .err_tip(|| "in GrpcStore::update_action_result") |
428 | 0 | }) |
429 | 0 | .await |
430 | 0 | } |
431 | | |
432 | 0 | async fn get_action_result_from_digest( |
433 | 0 | &self, |
434 | 0 | digest: DigestInfo, |
435 | 0 | ) -> Result<Response<ActionResult>, Error> { |
436 | 0 | let action_result_request = GetActionResultRequest { |
437 | 0 | instance_name: self.instance_name.clone(), |
438 | 0 | action_digest: Some(digest.into()), |
439 | 0 | inline_stdout: false, |
440 | 0 | inline_stderr: false, |
441 | 0 | inline_output_files: Vec::new(), |
442 | 0 | digest_function: ActiveOriginContext::get_value(&ACTIVE_HASHER_FUNC) |
443 | 0 | .err_tip(|| "In get_action_from_store")? |
444 | 0 | .map_or_else(default_digest_hasher_func, |v| *v) |
445 | 0 | .proto_digest_func() |
446 | 0 | .into(), |
447 | 0 | }; |
448 | 0 | self.get_action_result(Request::new(action_result_request)) |
449 | 0 | .await |
450 | 0 | } |
451 | | |
452 | 0 | async fn get_action_result_as_part( |
453 | 0 | &self, |
454 | 0 | digest: DigestInfo, |
455 | 0 | writer: &mut DropCloserWriteHalf, |
456 | 0 | offset: usize, |
457 | 0 | length: Option<usize>, |
458 | 0 | ) -> Result<(), Error> { |
459 | 0 | let action_result = self |
460 | 0 | .get_action_result_from_digest(digest) |
461 | 0 | .await |
462 | 0 | .map(Response::into_inner) |
463 | 0 | .err_tip(|| "Action result not found")?; |
464 | | // TODO: Would be better to avoid all the encoding and decoding in this |
465 | | // file, however there's no way to currently get raw bytes from a |
466 | | // generated prost request unfortunately. |
467 | 0 | let mut value = BytesMut::new(); |
468 | 0 | action_result |
469 | 0 | .encode(&mut value) |
470 | 0 | .err_tip(|| "Could not encode upstream action result")?; |
471 | | |
472 | 0 | let default_len = value.len() - offset; |
473 | 0 | let length = length.unwrap_or(default_len).min(default_len); |
474 | 0 | if length > 0 { Branch (474:12): [True: 0, False: 0]
Branch (474:12): [Folded - Ignored]
|
475 | 0 | writer |
476 | 0 | .send(value.freeze().slice(offset..offset + length)) |
477 | 0 | .await |
478 | 0 | .err_tip(|| "Failed to write data in grpc store")?; |
479 | 0 | } |
480 | 0 | writer |
481 | 0 | .send_eof() |
482 | 0 | .err_tip(|| "Failed to write EOF in grpc store get_action_result_as_part")?; |
483 | 0 | Ok(()) |
484 | 0 | } |
485 | | |
486 | 0 | async fn update_action_result_from_bytes( |
487 | 0 | &self, |
488 | 0 | digest: DigestInfo, |
489 | 0 | mut reader: DropCloserReadHalf, |
490 | 0 | ) -> Result<(), Error> { |
491 | 0 | let action_result = ActionResult::decode(reader.consume(None).await?) |
492 | 0 | .err_tip(|| "Failed to decode ActionResult in update_action_result_from_bytes")?; |
493 | 0 | let update_action_request = UpdateActionResultRequest { |
494 | 0 | instance_name: self.instance_name.clone(), |
495 | 0 | action_digest: Some(digest.into()), |
496 | 0 | action_result: Some(action_result), |
497 | 0 | results_cache_policy: None, |
498 | 0 | digest_function: ActiveOriginContext::get_value(&ACTIVE_HASHER_FUNC) |
499 | 0 | .err_tip(|| "In get_action_from_store")? |
500 | 0 | .map_or_else(default_digest_hasher_func, |v| *v) |
501 | 0 | .proto_digest_func() |
502 | 0 | .into(), |
503 | 0 | }; |
504 | 0 | self.update_action_result(Request::new(update_action_request)) |
505 | 0 | .await |
506 | 0 | .map(|_| ()) |
507 | 0 | } |
508 | | } |
509 | | |
510 | | #[async_trait] |
511 | | impl StoreDriver for GrpcStore { |
512 | | // NOTE: This function can only be safely used on CAS stores. AC stores may return a size that |
513 | | // is incorrect. |
514 | | async fn has_with_results( |
515 | | self: Pin<&Self>, |
516 | | keys: &[StoreKey<'_>], |
517 | | results: &mut [Option<u64>], |
518 | 0 | ) -> Result<(), Error> { |
519 | 0 | if matches!(self.store_type, nativelink_config::stores::StoreType::Ac) { |
520 | 0 | keys.iter() |
521 | 0 | .zip(results.iter_mut()) |
522 | 0 | .map(|(key, result)| async move { |
523 | 0 | // The length of an AC is incorrect, so we don't figure out the |
524 | 0 | // length, instead the biggest possible result is returned in the |
525 | 0 | // hope that we detect incorrect usage. |
526 | 0 | self.get_action_result_from_digest(key.borrow().into_digest()) |
527 | 0 | .await?; |
528 | 0 | *result = Some(u64::MAX); |
529 | 0 | Ok::<_, Error>(()) |
530 | 0 | }) |
531 | 0 | .collect::<FuturesUnordered<_>>() |
532 | 0 | .try_for_each(|()| future::ready(Ok(()))) |
533 | 0 | .await |
534 | 0 | .err_tip(|| "Getting upstream action cache entry")?; |
535 | 0 | return Ok(()); |
536 | 0 | } |
537 | | |
538 | 0 | let missing_blobs_response = self |
539 | 0 | .find_missing_blobs(Request::new(FindMissingBlobsRequest { |
540 | 0 | instance_name: self.instance_name.clone(), |
541 | 0 | blob_digests: keys |
542 | 0 | .iter() |
543 | 0 | .map(|k| k.borrow().into_digest().into()) |
544 | 0 | .collect(), |
545 | 0 | digest_function: ActiveOriginContext::get_value(&ACTIVE_HASHER_FUNC) |
546 | 0 | .err_tip(|| "In GrpcStore::has_with_results")? |
547 | 0 | .map_or_else(default_digest_hasher_func, |v| *v) |
548 | 0 | .proto_digest_func() |
549 | 0 | .into(), |
550 | 0 | })) |
551 | 0 | .await? |
552 | 0 | .into_inner(); |
553 | 0 |
|
554 | 0 | // Since the ordering is not guaranteed above, the matching has to check |
555 | 0 | // all missing blobs against all entries in the unsorted digest list. |
556 | 0 | // To optimise this, the missing digests are sorted and then it is |
557 | 0 | // efficient to perform a binary search for each digest within the |
558 | 0 | // missing list. |
559 | 0 | let mut missing_digests = |
560 | 0 | Vec::with_capacity(missing_blobs_response.missing_blob_digests.len()); |
561 | 0 | for missing_digest in missing_blobs_response.missing_blob_digests { |
562 | 0 | missing_digests.push(DigestInfo::try_from(missing_digest)?); |
563 | | } |
564 | 0 | missing_digests.sort_unstable(); |
565 | 0 | for (digest, result) in keys |
566 | 0 | .iter() |
567 | 0 | .map(|v| v.borrow().into_digest()) |
568 | 0 | .zip(results.iter_mut()) |
569 | | { |
570 | 0 | match missing_digests.binary_search(&digest) { |
571 | 0 | Ok(_) => *result = None, |
572 | 0 | Err(_) => *result = Some(digest.size_bytes()), |
573 | | } |
574 | | } |
575 | | |
576 | 0 | Ok(()) |
577 | 0 | } |
578 | | |
579 | | async fn update( |
580 | | self: Pin<&Self>, |
581 | | key: StoreKey<'_>, |
582 | | reader: DropCloserReadHalf, |
583 | | _size_info: UploadSizeInfo, |
584 | 0 | ) -> Result<(), Error> { |
585 | 0 | let digest = key.into_digest(); |
586 | 0 | if matches!(self.store_type, nativelink_config::stores::StoreType::Ac) { |
587 | 0 | return self.update_action_result_from_bytes(digest, reader).await; |
588 | 0 | } |
589 | | |
590 | 0 | let digest_function = ActiveOriginContext::get_value(&ACTIVE_HASHER_FUNC) |
591 | 0 | .err_tip(|| "In GrpcStore::update()")? |
592 | 0 | .map_or_else(default_digest_hasher_func, |v| *v) |
593 | 0 | .proto_digest_func() |
594 | 0 | .as_str_name() |
595 | 0 | .to_ascii_lowercase(); |
596 | 0 | let mut buf = Uuid::encode_buffer(); |
597 | 0 | let resource_name = format!( |
598 | 0 | "{}/uploads/{}/blobs/{}/{}/{}", |
599 | 0 | &self.instance_name, |
600 | 0 | Uuid::new_v4().hyphenated().encode_lower(&mut buf), |
601 | 0 | digest_function, |
602 | 0 | digest.packed_hash(), |
603 | 0 | digest.size_bytes(), |
604 | | ); |
605 | | |
606 | | struct LocalState { |
607 | | resource_name: String, |
608 | | reader: DropCloserReadHalf, |
609 | | did_error: bool, |
610 | | bytes_received: i64, |
611 | | } |
612 | 0 | let local_state = LocalState { |
613 | 0 | resource_name, |
614 | 0 | reader, |
615 | 0 | did_error: false, |
616 | 0 | bytes_received: 0, |
617 | 0 | }; |
618 | 0 |
|
619 | 0 | let stream = Box::pin(unfold(local_state, |mut local_state| async move { |
620 | 0 | if local_state.did_error { Branch (620:16): [True: 0, False: 0]
Branch (620:16): [Folded - Ignored]
|
621 | 0 | event!( |
622 | 0 | Level::ERROR, |
623 | 0 | "GrpcStore::update() polled stream after error was returned" |
624 | | ); |
625 | 0 | return None; |
626 | 0 | } |
627 | 0 | let data = match local_state |
628 | 0 | .reader |
629 | 0 | .recv() |
630 | 0 | .await |
631 | 0 | .err_tip(|| "In GrpcStore::update()") |
632 | | { |
633 | 0 | Ok(data) => data, |
634 | 0 | Err(err) => { |
635 | 0 | local_state.did_error = true; |
636 | 0 | return Some((Err(err), local_state)); |
637 | | } |
638 | | }; |
639 | | |
640 | 0 | let write_offset = local_state.bytes_received; |
641 | 0 | local_state.bytes_received += data.len() as i64; |
642 | 0 |
|
643 | 0 | Some(( |
644 | 0 | Ok(WriteRequest { |
645 | 0 | resource_name: local_state.resource_name.clone(), |
646 | 0 | write_offset, |
647 | 0 | finish_write: data.is_empty(), // EOF is when no data was polled. |
648 | 0 | data, |
649 | 0 | }), |
650 | 0 | local_state, |
651 | 0 | )) |
652 | 0 | })); |
653 | | |
654 | 0 | self.write( |
655 | 0 | WriteRequestStreamWrapper::from(stream) |
656 | 0 | .await |
657 | 0 | .err_tip(|| "in GrpcStore::update()")?, |
658 | | ) |
659 | 0 | .await |
660 | 0 | .err_tip(|| "in GrpcStore::update()")?; |
661 | | |
662 | 0 | Ok(()) |
663 | 0 | } |
664 | | |
665 | | async fn get_part( |
666 | | self: Pin<&Self>, |
667 | | key: StoreKey<'_>, |
668 | | writer: &mut DropCloserWriteHalf, |
669 | | offset: u64, |
670 | | length: Option<u64>, |
671 | 0 | ) -> Result<(), Error> { |
672 | 0 | let digest = key.into_digest(); |
673 | 0 | if matches!(self.store_type, nativelink_config::stores::StoreType::Ac) { |
674 | 0 | let offset = usize::try_from(offset).err_tip(|| "Could not convert offset to usize")?; |
675 | 0 | let length = length |
676 | 0 | .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize")) |
677 | 0 | .transpose()?; |
678 | | |
679 | 0 | return self |
680 | 0 | .get_action_result_as_part(digest, writer, offset, length) |
681 | 0 | .await; |
682 | 0 | } |
683 | 0 |
|
684 | 0 | // Shortcut for empty blobs. |
685 | 0 | if digest.size_bytes() == 0 { Branch (685:12): [True: 0, False: 0]
Branch (685:12): [Folded - Ignored]
|
686 | 0 | return writer.send_eof(); |
687 | 0 | } |
688 | 0 | let digest_function = ActiveOriginContext::get_value(&ACTIVE_HASHER_FUNC) |
689 | 0 | .err_tip(|| "In GrpcStore::update()")? |
690 | 0 | .map_or_else(default_digest_hasher_func, |v| *v) |
691 | 0 | .proto_digest_func() |
692 | 0 | .as_str_name() |
693 | 0 | .to_ascii_lowercase(); |
694 | 0 | let resource_name = format!( |
695 | 0 | "{}/blobs/{}/{}/{}", |
696 | 0 | &self.instance_name, |
697 | 0 | digest_function, |
698 | 0 | digest.packed_hash(), |
699 | 0 | digest.size_bytes(), |
700 | | ); |
701 | | |
702 | | struct LocalState<'a> { |
703 | | resource_name: String, |
704 | | writer: &'a mut DropCloserWriteHalf, |
705 | | read_offset: i64, |
706 | | read_limit: i64, |
707 | | } |
708 | | |
709 | 0 | let local_state = LocalState { |
710 | 0 | resource_name, |
711 | 0 | writer, |
712 | 0 | read_offset: i64::try_from(offset).err_tip(|| "Could not convert offset to i64")?, |
713 | 0 | read_limit: i64::try_from(length.unwrap_or(0)) |
714 | 0 | .err_tip(|| "Could not convert length to i64")?, |
715 | | }; |
716 | | |
717 | 0 | self.retrier |
718 | 0 | .retry(unfold(local_state, move |mut local_state| async move { |
719 | 0 | let request = ReadRequest { |
720 | 0 | resource_name: local_state.resource_name.clone(), |
721 | 0 | read_offset: local_state.read_offset, |
722 | 0 | read_limit: local_state.read_limit, |
723 | 0 | }; |
724 | 0 | let mut stream = match self |
725 | 0 | .read_internal(request) |
726 | 0 | .await |
727 | 0 | .err_tip(|| "in GrpcStore::get_part()") |
728 | | { |
729 | 0 | Ok(stream) => stream, |
730 | 0 | Err(err) => return Some((RetryResult::Retry(err), local_state)), |
731 | | }; |
732 | | |
733 | | loop { |
734 | 0 | let data = match stream.next().await { |
735 | | // Create an empty response to represent EOF. |
736 | 0 | None => bytes::Bytes::new(), |
737 | 0 | Some(Ok(message)) => message.data, |
738 | 0 | Some(Err(status)) => { |
739 | 0 | return Some(( |
740 | 0 | RetryResult::Retry( |
741 | 0 | Into::<Error>::into(status) |
742 | 0 | .append("While fetching message in GrpcStore::get_part()"), |
743 | 0 | ), |
744 | 0 | local_state, |
745 | 0 | )); |
746 | | } |
747 | | }; |
748 | 0 | let length = data.len() as i64; |
749 | 0 | // This is the usual exit from the loop at EOF. |
750 | 0 | if length == 0 { Branch (750:24): [True: 0, False: 0]
Branch (750:24): [Folded - Ignored]
|
751 | 0 | let eof_result = local_state |
752 | 0 | .writer |
753 | 0 | .send_eof() |
754 | 0 | .err_tip(|| "Could not send eof in GrpcStore::get_part()") |
755 | 0 | .map_or_else(RetryResult::Err, RetryResult::Ok); |
756 | 0 | return Some((eof_result, local_state)); |
757 | 0 | } |
758 | | // Forward the data upstream. |
759 | 0 | if let Err(err) = local_state Branch (759:28): [True: 0, False: 0]
Branch (759:28): [Folded - Ignored]
|
760 | 0 | .writer |
761 | 0 | .send(data) |
762 | 0 | .await |
763 | 0 | .err_tip(|| "While sending in GrpcStore::get_part()") |
764 | | { |
765 | 0 | return Some((RetryResult::Err(err), local_state)); |
766 | 0 | } |
767 | 0 | local_state.read_offset += length; |
768 | | } |
769 | 0 | })) |
770 | 0 | .await |
771 | 0 | } |
772 | | |
773 | 0 | fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { |
774 | 0 | self |
775 | 0 | } |
776 | | |
777 | 0 | fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) { |
778 | 0 | self |
779 | 0 | } |
780 | | |
781 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> { |
782 | 0 | self |
783 | 0 | } |
784 | | } |
785 | | |
786 | | default_health_status_indicator!(GrpcStore); |