/build/source/nativelink-store/src/grpc_store.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::pin::Pin; |
16 | | use core::time::Duration; |
17 | | use std::borrow::Cow; |
18 | | use std::sync::Arc; |
19 | | |
20 | | use async_trait::async_trait; |
21 | | use bytes::BytesMut; |
22 | | use futures::stream::{FuturesUnordered, unfold}; |
23 | | use futures::{Future, Stream, StreamExt, TryFutureExt, TryStreamExt, future}; |
24 | | use nativelink_config::stores::GrpcSpec; |
25 | | use nativelink_error::{Error, ResultExt, error_if, make_err}; |
26 | | use nativelink_metric::MetricsComponent; |
27 | | use nativelink_proto::build::bazel::remote::execution::v2::action_cache_client::ActionCacheClient; |
28 | | use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_client::ContentAddressableStorageClient; |
29 | | use nativelink_proto::build::bazel::remote::execution::v2::{ |
30 | | ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest, |
31 | | BatchUpdateBlobsResponse, FindMissingBlobsRequest, FindMissingBlobsResponse, |
32 | | GetActionResultRequest, GetTreeRequest, GetTreeResponse, UpdateActionResultRequest, |
33 | | }; |
34 | | use nativelink_proto::google::bytestream::byte_stream_client::ByteStreamClient; |
35 | | use nativelink_proto::google::bytestream::{ |
36 | | QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest, |
37 | | WriteResponse, |
38 | | }; |
39 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
40 | | use nativelink_util::common::DigestInfo; |
41 | | use nativelink_util::connection_manager::ConnectionManager; |
42 | | use nativelink_util::digest_hasher::{DigestHasherFunc, default_digest_hasher_func}; |
43 | | use nativelink_util::health_utils::HealthStatusIndicator; |
44 | | use nativelink_util::proto_stream_utils::{ |
45 | | FirstStream, WriteRequestStreamWrapper, WriteState, WriteStateWrapper, |
46 | | }; |
47 | | use nativelink_util::resource_info::ResourceInfo; |
48 | | use nativelink_util::retry::{Retrier, RetryResult}; |
49 | | use nativelink_util::store_trait::{RemoveItemCallback, StoreDriver, StoreKey, UploadSizeInfo}; |
50 | | use nativelink_util::telemetry::ClientHeaders; |
51 | | use nativelink_util::{default_health_status_indicator, tls_utils}; |
52 | | use opentelemetry::context::Context; |
53 | | use opentelemetry::global; |
54 | | use opentelemetry::propagation::Injector; |
55 | | use parking_lot::Mutex; |
56 | | use prost::Message; |
57 | | use tokio::time::sleep; |
58 | | use tonic::metadata::{Ascii, MetadataKey, MetadataValue}; |
59 | | use tonic::{Code, IntoRequest, Request, Response, Status, Streaming}; |
60 | | use tracing::{error, trace, warn}; |
61 | | use uuid::Uuid; |
62 | | |
63 | | struct TonicMetadataInjector<'a>(&'a mut tonic::metadata::MetadataMap); |
64 | | |
65 | | impl Injector for TonicMetadataInjector<'_> { |
66 | 0 | fn set(&mut self, key: &str, value: String) { |
67 | 0 | if let (Ok(k), Ok(v)) = ( |
68 | 0 | MetadataKey::from_bytes(key.as_bytes()), |
69 | 0 | MetadataValue::try_from(&value), |
70 | 0 | ) { |
71 | 0 | self.0.insert(k, v); |
72 | 0 | } |
73 | 0 | } |
74 | | } |
75 | | |
76 | | /// Adds configured static headers, forwards nominated client request headers, |
77 | | /// and injects the current OpenTelemetry trace context into an outgoing gRPC |
78 | | /// request. |
79 | 5 | fn enrich_request<T>( |
80 | 5 | mut request: Request<T>, |
81 | 5 | headers: &[(MetadataKey<Ascii>, MetadataValue<Ascii>)], |
82 | 5 | forward_headers: &[String], |
83 | 5 | ) -> Request<T> { |
84 | 5 | for (key1 , value1 ) in headers { |
85 | 1 | request.metadata_mut().insert(key.clone(), value.clone()); |
86 | 1 | } |
87 | 5 | if !forward_headers.is_empty() |
88 | 1 | && let Some(client_headers) = Context::current().get::<ClientHeaders>() |
89 | | { |
90 | 1 | for name in forward_headers { |
91 | 1 | if let Some(value) = client_headers.0.get(&name.to_lowercase()) |
92 | 1 | && let (Ok(k), Ok(v)) = ( |
93 | 1 | MetadataKey::from_bytes(name.as_bytes()), |
94 | 1 | MetadataValue::try_from(value.as_str()), |
95 | | ) |
96 | 1 | { |
97 | 1 | request.metadata_mut().insert(k, v); |
98 | 1 | }0 |
99 | | } |
100 | 4 | } |
101 | 5 | global::get_text_map_propagator(|propagator| { |
102 | 5 | propagator.inject(&mut TonicMetadataInjector(request.metadata_mut())); |
103 | 5 | }); |
104 | 5 | request |
105 | 5 | } |
106 | | |
107 | | // This store is usually a pass-through store, but can also be used as a CAS store. Using it as an |
108 | | // AC store has one major side-effect... The has() function may not give the proper size of the |
109 | | // underlying data. This might cause issues if embedded in certain stores. |
110 | | #[derive(Debug, MetricsComponent)] |
111 | | pub struct GrpcStore { |
112 | | #[metric(help = "Instance name for the store")] |
113 | | instance_name: String, |
114 | | store_type: nativelink_config::stores::StoreType, |
115 | | retrier: Retrier, |
116 | | connection_manager: ConnectionManager, |
117 | | /// Per-RPC timeout. `Duration::ZERO` means disabled. |
118 | | rpc_timeout: Duration, |
119 | | use_legacy_resource_names: bool, |
120 | | headers: Vec<(MetadataKey<Ascii>, MetadataValue<Ascii>)>, |
121 | | forward_headers: Vec<String>, |
122 | | } |
123 | | |
124 | | impl GrpcStore { |
125 | 6 | pub async fn new(spec: &GrpcSpec) -> Result<Arc<Self>, Error> { |
126 | 6 | Self::new_with_jitter(spec, spec.retry.make_jitter_fn()).await |
127 | 6 | } |
128 | | |
129 | 6 | pub async fn new_with_jitter( |
130 | 6 | spec: &GrpcSpec, |
131 | 6 | jitter_fn: Arc<dyn Fn(Duration) -> Duration + Send + Sync>, |
132 | 6 | ) -> Result<Arc<Self>, Error> { |
133 | 0 | error_if!( |
134 | 6 | spec.endpoints.is_empty(), |
135 | | "Expected at least 1 endpoint in GrpcStore" |
136 | | ); |
137 | 6 | let mut endpoints = Vec::with_capacity(spec.endpoints.len()); |
138 | 6 | for endpoint_config in &spec.endpoints { |
139 | 6 | let endpoint = tls_utils::endpoint(endpoint_config).map_err(|e| {0 |
140 | 0 | Error::from_std_err(Code::InvalidArgument, &e) |
141 | 0 | .append("Invalid URI for GrpcStore endpoint") |
142 | 0 | })?; |
143 | 6 | endpoints.push(endpoint); |
144 | | } |
145 | | |
146 | 6 | let rpc_timeout = Duration::from_secs(spec.rpc_timeout_s); |
147 | | |
148 | 6 | let mut headers = Vec::with_capacity(spec.headers.len()); |
149 | 6 | for (name1 , value1 ) in &spec.headers { |
150 | | // We lowercase keys as HTTP headers are case-insensitive so we should match all cases |
151 | 1 | let key = MetadataKey::from_bytes(name.to_lowercase().as_bytes()).map_err(|_| {0 |
152 | 0 | make_err!(Code::InvalidArgument, "Invalid gRPC metadata key: {name}") |
153 | 0 | })?; |
154 | 1 | let val = MetadataValue::try_from(value.as_str()).map_err(|_| {0 |
155 | 0 | make_err!( |
156 | 0 | Code::InvalidArgument, |
157 | | "Invalid gRPC metadata value for key: {name}" |
158 | | ) |
159 | 0 | })?; |
160 | 1 | headers.push((key, val)); |
161 | | } |
162 | | |
163 | 6 | Ok(Arc::new(Self { |
164 | 6 | instance_name: spec.instance_name.clone(), |
165 | 6 | store_type: spec.store_type, |
166 | 6 | retrier: Retrier::new( |
167 | 6 | Arc::new(|duration| Box::pin0 (sleep0 (duration0 ))), |
168 | 6 | jitter_fn.clone(), |
169 | 6 | spec.retry.clone(), |
170 | | ), |
171 | 6 | connection_manager: ConnectionManager::new( |
172 | 6 | endpoints, |
173 | 6 | spec.connections_per_endpoint, |
174 | 6 | spec.max_concurrent_requests, |
175 | 6 | spec.retry.clone(), |
176 | 6 | jitter_fn, |
177 | | ), |
178 | 6 | rpc_timeout, |
179 | 6 | use_legacy_resource_names: spec.use_legacy_resource_names, |
180 | 6 | headers, |
181 | | // We lowercase keys as HTTP headers are case-insensitive so we should match all cases |
182 | 6 | forward_headers: spec |
183 | 6 | .forward_headers |
184 | 6 | .iter() |
185 | 6 | .map(|s| s1 .to_lowercase1 ()) |
186 | 6 | .collect(), |
187 | | })) |
188 | 6 | } |
189 | | |
190 | 0 | async fn perform_request<F, Fut, R, I>(&self, input: I, mut request: F) -> Result<R, Error> |
191 | 0 | where |
192 | 0 | F: FnMut(I) -> Fut + Send + Copy, |
193 | 0 | Fut: Future<Output = Result<R, Error>> + Send, |
194 | 0 | R: Send, |
195 | 0 | I: Send + Clone, |
196 | 0 | { |
197 | 0 | self.retrier |
198 | 0 | .retry(unfold(input, move |input| async move { |
199 | 0 | let input_clone = input.clone(); |
200 | | Some(( |
201 | 0 | request(input_clone) |
202 | 0 | .await |
203 | 0 | .map_or_else(RetryResult::Retry, RetryResult::Ok), |
204 | 0 | input, |
205 | | )) |
206 | 0 | })) |
207 | 0 | .await |
208 | 0 | } |
209 | | |
210 | 1 | pub async fn find_missing_blobs( |
211 | 1 | &self, |
212 | 1 | grpc_request: Request<FindMissingBlobsRequest>, |
213 | 1 | ) -> Result<Response<FindMissingBlobsResponse>, Error> { |
214 | 0 | error_if!( |
215 | 1 | matches!(self.store_type, nativelink_config::stores::StoreType::Ac), |
216 | | "CAS operation on AC store" |
217 | | ); |
218 | | |
219 | 1 | let mut request = grpc_request.into_inner(); |
220 | | |
221 | | // Some builds (Chromium for example) do lots of empty requests for some reason, so shortcut them |
222 | 1 | if request.blob_digests.is_empty() { |
223 | 1 | return Ok(Response::new(FindMissingBlobsResponse { |
224 | 1 | missing_blob_digests: vec![], |
225 | 1 | })); |
226 | 0 | } |
227 | | |
228 | 0 | request.instance_name.clone_from(&self.instance_name); |
229 | 0 | self.perform_request(request, |request| async move { |
230 | 0 | let channel = self |
231 | 0 | .connection_manager |
232 | 0 | .connection(format!( |
233 | 0 | "find_missing_blobs: ({}) {:?}", |
234 | 0 | request.blob_digests.len(), |
235 | 0 | request.blob_digests |
236 | 0 | )) |
237 | 0 | .await |
238 | 0 | .err_tip(|| "in find_missing_blobs")?; |
239 | 0 | ContentAddressableStorageClient::new(channel) |
240 | 0 | .find_missing_blobs(enrich_request( |
241 | 0 | Request::new(request), |
242 | 0 | &self.headers, |
243 | 0 | &self.forward_headers, |
244 | 0 | )) |
245 | 0 | .await |
246 | 0 | .err_tip(|| "in GrpcStore::find_missing_blobs") |
247 | 0 | }) |
248 | 0 | .await |
249 | 1 | } |
250 | | |
251 | 0 | pub async fn batch_update_blobs( |
252 | 0 | &self, |
253 | 0 | grpc_request: Request<BatchUpdateBlobsRequest>, |
254 | 0 | ) -> Result<Response<BatchUpdateBlobsResponse>, Error> { |
255 | 0 | error_if!( |
256 | 0 | matches!(self.store_type, nativelink_config::stores::StoreType::Ac), |
257 | | "CAS operation on AC store" |
258 | | ); |
259 | | |
260 | 0 | let mut request = grpc_request.into_inner(); |
261 | 0 | request.instance_name.clone_from(&self.instance_name); |
262 | 0 | self.perform_request(request, |request| async move { |
263 | 0 | let channel = self |
264 | 0 | .connection_manager |
265 | 0 | .connection("batch_update_blobs".into()) |
266 | 0 | .await |
267 | 0 | .err_tip(|| "in batch_update_blobs")?; |
268 | 0 | ContentAddressableStorageClient::new(channel) |
269 | 0 | .batch_update_blobs(enrich_request( |
270 | 0 | Request::new(request), |
271 | 0 | &self.headers, |
272 | 0 | &self.forward_headers, |
273 | 0 | )) |
274 | 0 | .await |
275 | 0 | .err_tip(|| "in GrpcStore::batch_update_blobs") |
276 | 0 | }) |
277 | 0 | .await |
278 | 0 | } |
279 | | |
280 | 0 | pub async fn batch_read_blobs( |
281 | 0 | &self, |
282 | 0 | grpc_request: Request<BatchReadBlobsRequest>, |
283 | 0 | ) -> Result<Response<BatchReadBlobsResponse>, Error> { |
284 | 0 | error_if!( |
285 | 0 | matches!(self.store_type, nativelink_config::stores::StoreType::Ac), |
286 | | "CAS operation on AC store" |
287 | | ); |
288 | | |
289 | 0 | let mut request = grpc_request.into_inner(); |
290 | 0 | request.instance_name.clone_from(&self.instance_name); |
291 | 0 | self.perform_request(request, |request| async move { |
292 | 0 | let channel = self |
293 | 0 | .connection_manager |
294 | 0 | .connection("batch_read_blobs".into()) |
295 | 0 | .await |
296 | 0 | .err_tip(|| "in batch_read_blobs")?; |
297 | 0 | ContentAddressableStorageClient::new(channel) |
298 | 0 | .batch_read_blobs(enrich_request( |
299 | 0 | Request::new(request), |
300 | 0 | &self.headers, |
301 | 0 | &self.forward_headers, |
302 | 0 | )) |
303 | 0 | .await |
304 | 0 | .err_tip(|| "in GrpcStore::batch_read_blobs") |
305 | 0 | }) |
306 | 0 | .await |
307 | 0 | } |
308 | | |
309 | 0 | pub async fn get_tree( |
310 | 0 | &self, |
311 | 0 | grpc_request: Request<GetTreeRequest>, |
312 | 0 | ) -> Result<Response<Streaming<GetTreeResponse>>, Error> { |
313 | 0 | error_if!( |
314 | 0 | matches!(self.store_type, nativelink_config::stores::StoreType::Ac), |
315 | | "CAS operation on AC store" |
316 | | ); |
317 | | |
318 | 0 | let mut request = grpc_request.into_inner(); |
319 | 0 | request.instance_name.clone_from(&self.instance_name); |
320 | 0 | self.perform_request(request, |request| async move { |
321 | 0 | let channel = self |
322 | 0 | .connection_manager |
323 | 0 | .connection(format!("get_tree: {:?}", request.root_digest)) |
324 | 0 | .await |
325 | 0 | .err_tip(|| "in get_tree")?; |
326 | 0 | ContentAddressableStorageClient::new(channel) |
327 | 0 | .get_tree(enrich_request( |
328 | 0 | Request::new(request), |
329 | 0 | &self.headers, |
330 | 0 | &self.forward_headers, |
331 | 0 | )) |
332 | 0 | .await |
333 | 0 | .err_tip(|| "in GrpcStore::get_tree") |
334 | 0 | }) |
335 | 0 | .await |
336 | 0 | } |
337 | | |
338 | 0 | fn get_read_request(&self, mut request: ReadRequest) -> Result<ReadRequest, Error> { |
339 | | const IS_UPLOAD_FALSE: bool = false; |
340 | 0 | let mut resource_info = ResourceInfo::new(&request.resource_name, IS_UPLOAD_FALSE)?; |
341 | 0 | if resource_info.instance_name != self.instance_name { |
342 | 0 | resource_info.instance_name = Cow::Borrowed(&self.instance_name); |
343 | 0 | request.resource_name = resource_info.to_string(IS_UPLOAD_FALSE); |
344 | 0 | } |
345 | 0 | Ok(request) |
346 | 0 | } |
347 | | |
348 | 3 | async fn read_internal( |
349 | 3 | &self, |
350 | 3 | request: ReadRequest, |
351 | 3 | ) -> Result<impl Stream<Item = Result<ReadResponse, Status>> + use<>, Error> { |
352 | 3 | let channel = self |
353 | 3 | .connection_manager |
354 | 3 | .connection(format!("read_internal: {}", request.resource_name)) |
355 | 3 | .await |
356 | 3 | .err_tip(|| "in read_internal")?0 ; |
357 | 3 | let mut response = ByteStreamClient::new(channel) |
358 | 3 | .read(enrich_request( |
359 | 3 | Request::new(request), |
360 | 3 | &self.headers, |
361 | 3 | &self.forward_headers, |
362 | 3 | )) |
363 | 3 | .await |
364 | 3 | .err_tip(|| "in GrpcStore::read")?0 |
365 | 3 | .into_inner(); |
366 | 3 | let first_response = response |
367 | 3 | .message() |
368 | 3 | .await |
369 | 3 | .err_tip(|| "Fetching first chunk in GrpcStore::read()")?0 ; |
370 | 3 | Ok(FirstStream::new(first_response, response)) |
371 | 3 | } |
372 | | |
373 | 0 | pub async fn read<R>( |
374 | 0 | &self, |
375 | 0 | grpc_request: R, |
376 | 0 | ) -> Result<impl Stream<Item = Result<ReadResponse, Status>> + use<R>, Error> |
377 | 0 | where |
378 | 0 | R: IntoRequest<ReadRequest>, |
379 | 0 | { |
380 | 0 | error_if!( |
381 | 0 | matches!(self.store_type, nativelink_config::stores::StoreType::Ac), |
382 | | "CAS operation on AC store" |
383 | | ); |
384 | | |
385 | 0 | let request = self.get_read_request(grpc_request.into_request().into_inner())?; |
386 | 0 | self.perform_request(request, |request| async move { |
387 | 0 | self.read_internal(request).await |
388 | 0 | }) |
389 | 0 | .await |
390 | 0 | } |
391 | | |
392 | 2 | pub async fn write<T, E>( |
393 | 2 | &self, |
394 | 2 | stream: WriteRequestStreamWrapper<T>, |
395 | 2 | ) -> Result<Response<WriteResponse>, Error> |
396 | 2 | where |
397 | 2 | T: Stream<Item = Result<WriteRequest, E>> + Unpin + Send + 'static, |
398 | 2 | E: Into<Error> + 'static, |
399 | 2 | { |
400 | 0 | error_if!( |
401 | 2 | matches!(self.store_type, nativelink_config::stores::StoreType::Ac), |
402 | | "CAS operation on AC store" |
403 | | ); |
404 | | |
405 | 2 | let local_state = Arc::new(Mutex::new(WriteState::new( |
406 | 2 | self.instance_name.clone(), |
407 | 2 | stream, |
408 | | ))); |
409 | | |
410 | 2 | let write_start = std::time::Instant::now(); |
411 | 2 | let instance_name = self.instance_name.clone(); |
412 | 2 | let rpc_timeout = self.rpc_timeout; |
413 | 2 | trace!( |
414 | | instance_name = %instance_name, |
415 | 2 | rpc_timeout_s = rpc_timeout.as_secs(), |
416 | | "GrpcStore::write: starting ByteStream write", |
417 | | ); |
418 | 2 | let mut attempt: u32 = 0; |
419 | 2 | let result = self |
420 | 2 | .retrier |
421 | 2 | .retry(unfold(local_state, move |local_state| { |
422 | 2 | attempt += 1; |
423 | 2 | let instance_name = instance_name.clone(); |
424 | 2 | async move { |
425 | | // The client write may occur on a separate thread and |
426 | | // therefore in order to share the state with it we have to |
427 | | // wrap it in a Mutex and retrieve it after the write |
428 | | // has completed. There is no way to get the value back |
429 | | // from the client. |
430 | 2 | trace!( |
431 | | instance_name = %instance_name, |
432 | | attempt, |
433 | | "GrpcStore::write: requesting connection from pool", |
434 | | ); |
435 | 2 | let conn_start = std::time::Instant::now(); |
436 | 2 | let rpc_fut = self.connection_manager.connection("write".into()).and_then( |
437 | 2 | |channel| { |
438 | 2 | let conn_elapsed = conn_start.elapsed(); |
439 | 2 | let instance_for_rpc = instance_name.clone(); |
440 | 2 | let conn_elapsed_ms = |
441 | 2 | u64::try_from(conn_elapsed.as_millis()).unwrap_or(u64::MAX); |
442 | 2 | trace!( |
443 | | instance_name = %instance_for_rpc, |
444 | | conn_elapsed_ms, |
445 | | "GrpcStore::write: got connection, starting ByteStream.Write RPC", |
446 | | ); |
447 | 2 | let rpc_start = std::time::Instant::now(); |
448 | 2 | let local_state_for_rpc = local_state.clone(); |
449 | 2 | async move { |
450 | 2 | let res = ByteStreamClient::new(channel) |
451 | 2 | .write(enrich_request( |
452 | 2 | Request::new(WriteStateWrapper::new(local_state_for_rpc)), |
453 | 2 | &self.headers, |
454 | 2 | &self.forward_headers, |
455 | 2 | )) |
456 | 2 | .await |
457 | 2 | .err_tip(|| "in GrpcStore::write"); |
458 | 2 | let rpc_elapsed_ms = u64::try_from(rpc_start.elapsed().as_millis()) |
459 | 2 | .unwrap_or(u64::MAX); |
460 | 2 | trace!( |
461 | | instance_name = %instance_for_rpc, |
462 | | rpc_elapsed_ms, |
463 | 2 | success = res.is_ok(), |
464 | | "GrpcStore::write: ByteStream.Write RPC returned", |
465 | | ); |
466 | 2 | res |
467 | 2 | } |
468 | 2 | }, |
469 | | ); |
470 | | |
471 | 2 | let result = if rpc_timeout > Duration::ZERO { |
472 | 2 | match tokio::time::timeout(rpc_timeout, rpc_fut).await { |
473 | 2 | Ok(res) => res, |
474 | 0 | Err(_elapsed) => { |
475 | 0 | warn!( |
476 | | instance_name = %instance_name, |
477 | | attempt, |
478 | 0 | rpc_timeout_s = rpc_timeout.as_secs(), |
479 | | "GrpcStore::write: per-RPC timeout exceeded, cancelling", |
480 | | ); |
481 | | #[allow(unused_qualifications)] |
482 | 0 | Err(nativelink_error::make_err!( |
483 | 0 | nativelink_error::Code::DeadlineExceeded, |
484 | 0 | "GrpcStore::write RPC timed out after {}s", |
485 | 0 | rpc_timeout.as_secs() |
486 | 0 | )) |
487 | | } |
488 | | } |
489 | | } else { |
490 | 0 | rpc_fut.await |
491 | | }; |
492 | | |
493 | | // Get the state back from StateWrapper, this should be |
494 | | // uncontended since write has returned. |
495 | 2 | let mut local_state_locked = local_state.lock(); |
496 | | |
497 | 2 | let result = local_state_locked |
498 | 2 | .take_read_stream_error() |
499 | 2 | .map(|err| RetryResult::Err(err0 .append0 ("Where read_stream_error was set"))) |
500 | 2 | .unwrap_or_else(|| { |
501 | | // No stream error, handle the original result |
502 | 2 | match result { |
503 | 2 | Ok(response) => RetryResult::Ok(response), |
504 | 0 | Err(ref err) => { |
505 | 0 | warn!( |
506 | | instance_name = %instance_name, |
507 | | attempt, |
508 | | ?err, |
509 | 0 | can_resume = local_state_locked.can_resume(), |
510 | | "GrpcStore::write: RPC failed", |
511 | | ); |
512 | 0 | if local_state_locked.can_resume() { |
513 | 0 | local_state_locked.resume(); |
514 | 0 | RetryResult::Retry(err.clone()) |
515 | | } else { |
516 | 0 | RetryResult::Err( |
517 | 0 | err.clone().append("Retry is not possible"), |
518 | 0 | ) |
519 | | } |
520 | | } |
521 | | } |
522 | 2 | }); |
523 | | |
524 | 2 | drop(local_state_locked); |
525 | 2 | Some((result, local_state)) |
526 | 2 | } |
527 | 2 | })) |
528 | 2 | .await?0 ; |
529 | | |
530 | 2 | let total_elapsed_ms = u64::try_from(write_start.elapsed().as_millis()).unwrap_or(u64::MAX); |
531 | 2 | trace!( |
532 | | instance_name = %self.instance_name, |
533 | | total_elapsed_ms, |
534 | | "GrpcStore::write: completed successfully", |
535 | | ); |
536 | 2 | Ok(result) |
537 | 2 | } |
538 | | |
539 | 0 | pub async fn query_write_status( |
540 | 0 | &self, |
541 | 0 | grpc_request: Request<QueryWriteStatusRequest>, |
542 | 0 | ) -> Result<Response<QueryWriteStatusResponse>, Error> { |
543 | | const IS_UPLOAD_TRUE: bool = true; |
544 | | |
545 | 0 | error_if!( |
546 | 0 | matches!(self.store_type, nativelink_config::stores::StoreType::Ac), |
547 | | "CAS operation on AC store" |
548 | | ); |
549 | | |
550 | 0 | let mut request = grpc_request.into_inner(); |
551 | | |
552 | 0 | let mut request_info = ResourceInfo::new(&request.resource_name, IS_UPLOAD_TRUE)?; |
553 | 0 | if request_info.instance_name != self.instance_name { |
554 | 0 | request_info.instance_name = Cow::Borrowed(&self.instance_name); |
555 | 0 | request.resource_name = request_info.to_string(IS_UPLOAD_TRUE); |
556 | 0 | } |
557 | | |
558 | 0 | self.perform_request(request, |request| async move { |
559 | 0 | let channel = self |
560 | 0 | .connection_manager |
561 | 0 | .connection(format!("query_write_status: {}", request.resource_name)) |
562 | 0 | .await |
563 | 0 | .err_tip(|| "in query_write_status")?; |
564 | 0 | ByteStreamClient::new(channel) |
565 | 0 | .query_write_status(enrich_request( |
566 | 0 | Request::new(request), |
567 | 0 | &self.headers, |
568 | 0 | &self.forward_headers, |
569 | 0 | )) |
570 | 0 | .await |
571 | 0 | .err_tip(|| "in GrpcStore::query_write_status") |
572 | 0 | }) |
573 | 0 | .await |
574 | 0 | } |
575 | | |
576 | 0 | pub async fn get_action_result( |
577 | 0 | &self, |
578 | 0 | grpc_request: Request<GetActionResultRequest>, |
579 | 0 | ) -> Result<Response<ActionResult>, Error> { |
580 | 0 | let mut request = grpc_request.into_inner(); |
581 | 0 | request.instance_name.clone_from(&self.instance_name); |
582 | 0 | self.perform_request(request, |request| async move { |
583 | 0 | let channel = self |
584 | 0 | .connection_manager |
585 | 0 | .connection(format!("get_action_result: {:?}", request.action_digest)) |
586 | 0 | .await |
587 | 0 | .err_tip(|| "in get_action_result")?; |
588 | 0 | ActionCacheClient::new(channel) |
589 | 0 | .get_action_result(enrich_request( |
590 | 0 | Request::new(request), |
591 | 0 | &self.headers, |
592 | 0 | &self.forward_headers, |
593 | 0 | )) |
594 | 0 | .await |
595 | 0 | .err_tip(|| "in GrpcStore::get_action_result") |
596 | 0 | }) |
597 | 0 | .await |
598 | 0 | } |
599 | | |
600 | 0 | pub async fn update_action_result( |
601 | 0 | &self, |
602 | 0 | grpc_request: Request<UpdateActionResultRequest>, |
603 | 0 | ) -> Result<Response<ActionResult>, Error> { |
604 | 0 | let mut request = grpc_request.into_inner(); |
605 | 0 | request.instance_name.clone_from(&self.instance_name); |
606 | 0 | self.perform_request(request, |request| async move { |
607 | 0 | let channel = self |
608 | 0 | .connection_manager |
609 | 0 | .connection(format!("update_action_result: {:?}", request.action_digest)) |
610 | 0 | .await |
611 | 0 | .err_tip(|| "in update_action_result")?; |
612 | 0 | ActionCacheClient::new(channel) |
613 | 0 | .update_action_result(enrich_request( |
614 | 0 | Request::new(request), |
615 | 0 | &self.headers, |
616 | 0 | &self.forward_headers, |
617 | 0 | )) |
618 | 0 | .await |
619 | 0 | .err_tip(|| "in GrpcStore::update_action_result") |
620 | 0 | }) |
621 | 0 | .await |
622 | 0 | } |
623 | | |
624 | 0 | async fn get_action_result_from_digest( |
625 | 0 | &self, |
626 | 0 | digest: DigestInfo, |
627 | 0 | ) -> Result<Response<ActionResult>, Error> { |
628 | 0 | let action_result_request = GetActionResultRequest { |
629 | 0 | instance_name: self.instance_name.clone(), |
630 | 0 | action_digest: Some(digest.into()), |
631 | | inline_stdout: false, |
632 | | inline_stderr: false, |
633 | 0 | inline_output_files: Vec::new(), |
634 | 0 | digest_function: Context::current() |
635 | 0 | .get::<DigestHasherFunc>() |
636 | 0 | .map_or_else(default_digest_hasher_func, |v| *v) |
637 | 0 | .proto_digest_func() |
638 | 0 | .into(), |
639 | | }; |
640 | 0 | self.get_action_result(Request::new(action_result_request)) |
641 | 0 | .await |
642 | 0 | } |
643 | | |
644 | 0 | async fn get_action_result_as_part( |
645 | 0 | &self, |
646 | 0 | digest: DigestInfo, |
647 | 0 | writer: &mut DropCloserWriteHalf, |
648 | 0 | offset: usize, |
649 | 0 | length: Option<usize>, |
650 | 0 | ) -> Result<(), Error> { |
651 | 0 | let action_result = self |
652 | 0 | .get_action_result_from_digest(digest) |
653 | 0 | .await |
654 | 0 | .map(Response::into_inner) |
655 | 0 | .err_tip(|| "Action result not found")?; |
656 | | // TODO: Would be better to avoid all the encoding and decoding in this |
657 | | // file, however there's no way to currently get raw bytes from a |
658 | | // generated prost request unfortunately. |
659 | 0 | let mut value = BytesMut::new(); |
660 | 0 | action_result |
661 | 0 | .encode(&mut value) |
662 | 0 | .err_tip(|| "Could not encode upstream action result")?; |
663 | | |
664 | 0 | let default_len = value.len() - offset; |
665 | 0 | let length = length.unwrap_or(default_len).min(default_len); |
666 | 0 | if length > 0 { |
667 | 0 | writer |
668 | 0 | .send(value.freeze().slice(offset..offset + length)) |
669 | 0 | .await |
670 | 0 | .err_tip(|| "Failed to write data in grpc store")?; |
671 | 0 | } |
672 | 0 | writer |
673 | 0 | .send_eof() |
674 | 0 | .err_tip(|| "Failed to write EOF in grpc store get_action_result_as_part")?; |
675 | 0 | Ok(()) |
676 | 0 | } |
677 | | |
678 | 0 | async fn update_action_result_from_bytes( |
679 | 0 | &self, |
680 | 0 | digest: DigestInfo, |
681 | 0 | mut reader: DropCloserReadHalf, |
682 | 0 | ) -> Result<(), Error> { |
683 | 0 | let action_result = ActionResult::decode(reader.consume(None).await?) |
684 | 0 | .err_tip(|| "Failed to decode ActionResult in update_action_result_from_bytes")?; |
685 | 0 | let update_action_request = UpdateActionResultRequest { |
686 | 0 | instance_name: self.instance_name.clone(), |
687 | 0 | action_digest: Some(digest.into()), |
688 | 0 | action_result: Some(action_result), |
689 | 0 | results_cache_policy: None, |
690 | 0 | digest_function: Context::current() |
691 | 0 | .get::<DigestHasherFunc>() |
692 | 0 | .map_or_else(default_digest_hasher_func, |v| *v) |
693 | 0 | .proto_digest_func() |
694 | 0 | .into(), |
695 | | }; |
696 | 0 | self.update_action_result(Request::new(update_action_request)) |
697 | 0 | .await |
698 | 0 | .map(|_| ()) |
699 | 0 | } |
700 | | } |
701 | | |
702 | | #[async_trait] |
703 | | impl StoreDriver for GrpcStore { |
704 | | // NOTE: This function can only be safely used on CAS stores. AC stores may return a size that |
705 | | // is incorrect. |
706 | | async fn has_with_results( |
707 | | self: Pin<&Self>, |
708 | | keys: &[StoreKey<'_>], |
709 | | results: &mut [Option<u64>], |
710 | 0 | ) -> Result<(), Error> { |
711 | | if matches!(self.store_type, nativelink_config::stores::StoreType::Ac) { |
712 | | keys.iter() |
713 | | .zip(results.iter_mut()) |
714 | 0 | .map(|(key, result)| async move { |
715 | | // The length of an AC is incorrect, so we don't figure out the |
716 | | // length, instead the biggest possible result is returned in the |
717 | | // hope that we detect incorrect usage. |
718 | 0 | self.get_action_result_from_digest(key.borrow().into_digest()) |
719 | 0 | .await?; |
720 | 0 | *result = Some(u64::MAX); |
721 | 0 | Ok::<_, Error>(()) |
722 | 0 | }) |
723 | | .collect::<FuturesUnordered<_>>() |
724 | 0 | .try_for_each(|()| future::ready(Ok(()))) |
725 | | .await |
726 | | .err_tip(|| "Getting upstream action cache entry")?; |
727 | | return Ok(()); |
728 | | } |
729 | | |
730 | | let missing_blobs_response = self |
731 | | .find_missing_blobs(Request::new(FindMissingBlobsRequest { |
732 | | instance_name: self.instance_name.clone(), |
733 | | blob_digests: keys |
734 | | .iter() |
735 | 0 | .map(|k| k.borrow().into_digest().into()) |
736 | | .collect(), |
737 | | digest_function: Context::current() |
738 | | .get::<DigestHasherFunc>() |
739 | | .map_or_else(default_digest_hasher_func, |v| *v) |
740 | | .proto_digest_func() |
741 | | .into(), |
742 | | })) |
743 | | .await? |
744 | | .into_inner(); |
745 | | |
746 | | // Since the ordering is not guaranteed above, the matching has to check |
747 | | // all missing blobs against all entries in the unsorted digest list. |
748 | | // To optimise this, the missing digests are sorted and then it is |
749 | | // efficient to perform a binary search for each digest within the |
750 | | // missing list. |
751 | | let mut missing_digests = |
752 | | Vec::with_capacity(missing_blobs_response.missing_blob_digests.len()); |
753 | | for missing_digest in missing_blobs_response.missing_blob_digests { |
754 | | missing_digests.push(DigestInfo::try_from(missing_digest)?); |
755 | | } |
756 | | missing_digests.sort_unstable(); |
757 | | for (digest, result) in keys |
758 | | .iter() |
759 | 0 | .map(|v| v.borrow().into_digest()) |
760 | | .zip(results.iter_mut()) |
761 | | { |
762 | | match missing_digests.binary_search(&digest) { |
763 | | Ok(_) => *result = None, |
764 | | Err(_) => *result = Some(digest.size_bytes()), |
765 | | } |
766 | | } |
767 | | |
768 | | Ok(()) |
769 | 0 | } |
770 | | |
771 | | async fn update( |
772 | | self: Pin<&Self>, |
773 | | key: StoreKey<'_>, |
774 | | reader: DropCloserReadHalf, |
775 | | _size_info: UploadSizeInfo, |
776 | 2 | ) -> Result<(), Error> { |
777 | | struct LocalState { |
778 | | resource_name: String, |
779 | | reader: DropCloserReadHalf, |
780 | | did_error: bool, |
781 | | bytes_received: i64, |
782 | | } |
783 | | |
784 | | let digest = key.into_digest(); |
785 | | if matches!(self.store_type, nativelink_config::stores::StoreType::Ac) { |
786 | | return self.update_action_result_from_bytes(digest, reader).await; |
787 | | } |
788 | | |
789 | | let mut buf = Uuid::encode_buffer(); |
790 | | let resource_name = if self.use_legacy_resource_names { |
791 | | format!( |
792 | | "{}/uploads/{}/blobs/{}/{}", |
793 | | &self.instance_name, |
794 | | Uuid::new_v4().hyphenated().encode_lower(&mut buf), |
795 | | digest.packed_hash(), |
796 | | digest.size_bytes(), |
797 | | ) |
798 | | } else { |
799 | | let digest_function = Context::current() |
800 | | .get::<DigestHasherFunc>() |
801 | | .map_or_else(default_digest_hasher_func, |v| *v) |
802 | | .proto_digest_func() |
803 | | .as_str_name() |
804 | | .to_ascii_lowercase(); |
805 | | format!( |
806 | | "{}/uploads/{}/blobs/{}/{}/{}", |
807 | | &self.instance_name, |
808 | | Uuid::new_v4().hyphenated().encode_lower(&mut buf), |
809 | | digest_function, |
810 | | digest.packed_hash(), |
811 | | digest.size_bytes(), |
812 | | ) |
813 | | }; |
814 | | trace!( |
815 | | resource_name = %resource_name, |
816 | | digest_hash = %digest.packed_hash(), |
817 | | digest_size = digest.size_bytes(), |
818 | | "GrpcStore::update: starting upload for digest", |
819 | | ); |
820 | | let local_state = LocalState { |
821 | | resource_name, |
822 | | reader, |
823 | | did_error: false, |
824 | | bytes_received: 0, |
825 | | }; |
826 | | |
827 | 4 | let stream = Box::pin(unfold(local_state, |mut local_state| async move { |
828 | 4 | if local_state.did_error { |
829 | 0 | error!("GrpcStore::update() polled stream after error was returned"); |
830 | 0 | return None; |
831 | 4 | } |
832 | 4 | let data = match local_state |
833 | 4 | .reader |
834 | 4 | .recv() |
835 | 4 | .await |
836 | 4 | .err_tip(|| "In GrpcStore::update()") |
837 | | { |
838 | 4 | Ok(data) => data, |
839 | 0 | Err(err) => { |
840 | 0 | local_state.did_error = true; |
841 | 0 | return Some((Err(err), local_state)); |
842 | | } |
843 | | }; |
844 | | |
845 | 4 | let write_offset = local_state.bytes_received; |
846 | 4 | local_state.bytes_received += data.len() as i64; |
847 | | |
848 | 4 | Some(( |
849 | 4 | Ok(WriteRequest { |
850 | 4 | resource_name: local_state.resource_name.clone(), |
851 | 4 | write_offset, |
852 | 4 | finish_write: data.is_empty(), // EOF is when no data was polled. |
853 | 4 | data, |
854 | 4 | }), |
855 | 4 | local_state, |
856 | 4 | )) |
857 | 8 | })); |
858 | | |
859 | | self.write( |
860 | | WriteRequestStreamWrapper::from(stream) |
861 | | .await |
862 | | .err_tip(|| "in GrpcStore::update()")?, |
863 | | ) |
864 | | .await |
865 | | .err_tip(|| "in GrpcStore::update()")?; |
866 | | |
867 | | Ok(()) |
868 | 2 | } |
869 | | |
870 | | async fn get_part( |
871 | | self: Pin<&Self>, |
872 | | key: StoreKey<'_>, |
873 | | writer: &mut DropCloserWriteHalf, |
874 | | offset: u64, |
875 | | length: Option<u64>, |
876 | 3 | ) -> Result<(), Error> { |
877 | | struct LocalState<'a> { |
878 | | resource_name: String, |
879 | | writer: &'a mut DropCloserWriteHalf, |
880 | | read_offset: i64, |
881 | | read_limit: i64, |
882 | | } |
883 | | |
884 | | let digest = key.into_digest(); |
885 | | if matches!(self.store_type, nativelink_config::stores::StoreType::Ac) { |
886 | | let offset = usize::try_from(offset).err_tip(|| "Could not convert offset to usize")?; |
887 | | let length = length |
888 | 0 | .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize")) |
889 | | .transpose()?; |
890 | | |
891 | | return self |
892 | | .get_action_result_as_part(digest, writer, offset, length) |
893 | | .await; |
894 | | } |
895 | | |
896 | | // Shortcut for empty blobs. |
897 | | if digest.size_bytes() == 0 { |
898 | | return writer.send_eof(); |
899 | | } |
900 | | |
901 | | let resource_name = if self.use_legacy_resource_names { |
902 | | format!( |
903 | | "{}/blobs/{}/{}", |
904 | | &self.instance_name, |
905 | | digest.packed_hash(), |
906 | | digest.size_bytes(), |
907 | | ) |
908 | | } else { |
909 | | let digest_function = Context::current() |
910 | | .get::<DigestHasherFunc>() |
911 | | .map_or_else(default_digest_hasher_func, |v| *v) |
912 | | .proto_digest_func() |
913 | | .as_str_name() |
914 | | .to_ascii_lowercase(); |
915 | | format!( |
916 | | "{}/blobs/{}/{}/{}", |
917 | | &self.instance_name, |
918 | | digest_function, |
919 | | digest.packed_hash(), |
920 | | digest.size_bytes(), |
921 | | ) |
922 | | }; |
923 | | |
924 | | let local_state = LocalState { |
925 | | resource_name, |
926 | | writer, |
927 | | read_offset: i64::try_from(offset).err_tip(|| "Could not convert offset to i64")?, |
928 | | read_limit: i64::try_from(length.unwrap_or(0)) |
929 | | .err_tip(|| "Could not convert length to i64")?, |
930 | | }; |
931 | | |
932 | | self.retrier |
933 | 3 | .retry(unfold(local_state, move |mut local_state| async move { |
934 | 3 | let request = ReadRequest { |
935 | 3 | resource_name: local_state.resource_name.clone(), |
936 | 3 | read_offset: local_state.read_offset, |
937 | 3 | read_limit: local_state.read_limit, |
938 | 3 | }; |
939 | 3 | let mut stream = match self |
940 | 3 | .read_internal(request) |
941 | 3 | .await |
942 | 3 | .err_tip(|| "in GrpcStore::get_part()") |
943 | | { |
944 | 3 | Ok(stream) => stream, |
945 | 0 | Err(err) => return Some((RetryResult::Retry(err), local_state)), |
946 | | }; |
947 | | |
948 | | loop { |
949 | 6 | let data = match stream.next().await { |
950 | | // Create an empty response to represent EOF. |
951 | 3 | None => bytes::Bytes::new(), |
952 | 3 | Some(Ok(message)) => message.data, |
953 | 0 | Some(Err(status)) => { |
954 | 0 | return Some(( |
955 | 0 | RetryResult::Retry( |
956 | 0 | Into::<Error>::into(status) |
957 | 0 | .append("While fetching message in GrpcStore::get_part()"), |
958 | 0 | ), |
959 | 0 | local_state, |
960 | 0 | )); |
961 | | } |
962 | | }; |
963 | 6 | let length = data.len() as i64; |
964 | | // This is the usual exit from the loop at EOF. |
965 | 6 | if length == 0 { |
966 | 3 | let eof_result = local_state |
967 | 3 | .writer |
968 | 3 | .send_eof() |
969 | 3 | .err_tip(|| "Could not send eof in GrpcStore::get_part()") |
970 | 3 | .map_or_else(RetryResult::Err, RetryResult::Ok); |
971 | 3 | return Some((eof_result, local_state)); |
972 | 3 | } |
973 | | // Forward the data upstream. |
974 | 3 | if let Err(err0 ) = local_state |
975 | 3 | .writer |
976 | 3 | .send(data) |
977 | 3 | .await |
978 | 3 | .err_tip(|| "While sending in GrpcStore::get_part()") |
979 | | { |
980 | 0 | return Some((RetryResult::Err(err), local_state)); |
981 | 3 | } |
982 | 3 | local_state.read_offset += length; |
983 | | } |
984 | 6 | })) |
985 | | .await |
986 | 3 | } |
987 | | |
988 | 0 | fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { |
989 | 0 | self |
990 | 0 | } |
991 | | |
992 | 0 | fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) { |
993 | 0 | self |
994 | 0 | } |
995 | | |
996 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> { |
997 | 0 | self |
998 | 0 | } |
999 | | |
1000 | 0 | fn register_remove_callback( |
1001 | 0 | self: Arc<Self>, |
1002 | 0 | _callback: Arc<dyn RemoveItemCallback>, |
1003 | 0 | ) -> Result<(), Error> { |
1004 | 0 | Err(Error::new( |
1005 | 0 | Code::Internal, |
1006 | 0 | "gRPC stores are incompatible with removal callbacks".to_string(), |
1007 | 0 | )) |
1008 | 0 | } |
1009 | | } |
1010 | | |
1011 | | default_health_status_indicator!(GrpcStore); |