Coverage Report

Created: 2026-05-09 16:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/grpc_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use core::time::Duration;
17
use std::borrow::Cow;
18
use std::sync::Arc;
19
20
use async_trait::async_trait;
21
use bytes::BytesMut;
22
use futures::stream::{FuturesUnordered, unfold};
23
use futures::{Future, Stream, StreamExt, TryFutureExt, TryStreamExt, future};
24
use nativelink_config::stores::GrpcSpec;
25
use nativelink_error::{Error, ResultExt, error_if, make_err};
26
use nativelink_metric::MetricsComponent;
27
use nativelink_proto::build::bazel::remote::execution::v2::action_cache_client::ActionCacheClient;
28
use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_client::ContentAddressableStorageClient;
29
use nativelink_proto::build::bazel::remote::execution::v2::{
30
    ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest,
31
    BatchUpdateBlobsResponse, FindMissingBlobsRequest, FindMissingBlobsResponse,
32
    GetActionResultRequest, GetTreeRequest, GetTreeResponse, UpdateActionResultRequest,
33
};
34
use nativelink_proto::google::bytestream::byte_stream_client::ByteStreamClient;
35
use nativelink_proto::google::bytestream::{
36
    QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest,
37
    WriteResponse,
38
};
39
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
40
use nativelink_util::common::DigestInfo;
41
use nativelink_util::connection_manager::ConnectionManager;
42
use nativelink_util::digest_hasher::{DigestHasherFunc, default_digest_hasher_func};
43
use nativelink_util::health_utils::HealthStatusIndicator;
44
use nativelink_util::proto_stream_utils::{
45
    FirstStream, WriteRequestStreamWrapper, WriteState, WriteStateWrapper,
46
};
47
use nativelink_util::resource_info::ResourceInfo;
48
use nativelink_util::retry::{Retrier, RetryResult};
49
use nativelink_util::store_trait::{RemoveItemCallback, StoreDriver, StoreKey, UploadSizeInfo};
50
use nativelink_util::telemetry::ClientHeaders;
51
use nativelink_util::{default_health_status_indicator, tls_utils};
52
use opentelemetry::context::Context;
53
use opentelemetry::global;
54
use opentelemetry::propagation::Injector;
55
use parking_lot::Mutex;
56
use prost::Message;
57
use tokio::time::sleep;
58
use tonic::metadata::{Ascii, MetadataKey, MetadataValue};
59
use tonic::{Code, IntoRequest, Request, Response, Status, Streaming};
60
use tracing::{error, trace, warn};
61
use uuid::Uuid;
62
63
struct TonicMetadataInjector<'a>(&'a mut tonic::metadata::MetadataMap);
64
65
impl Injector for TonicMetadataInjector<'_> {
66
0
    fn set(&mut self, key: &str, value: String) {
67
0
        if let (Ok(k), Ok(v)) = (
68
0
            MetadataKey::from_bytes(key.as_bytes()),
69
0
            MetadataValue::try_from(&value),
70
0
        ) {
71
0
            self.0.insert(k, v);
72
0
        }
73
0
    }
74
}
75
76
/// Adds configured static headers, forwards nominated client request headers,
77
/// and injects the current OpenTelemetry trace context into an outgoing gRPC
78
/// request.
79
5
fn enrich_request<T>(
80
5
    mut request: Request<T>,
81
5
    headers: &[(MetadataKey<Ascii>, MetadataValue<Ascii>)],
82
5
    forward_headers: &[String],
83
5
) -> Request<T> {
84
5
    for (
key1
,
value1
) in headers {
85
1
        request.metadata_mut().insert(key.clone(), value.clone());
86
1
    }
87
5
    if !forward_headers.is_empty()
88
1
        && let Some(client_headers) = Context::current().get::<ClientHeaders>()
89
    {
90
1
        for name in forward_headers {
91
1
            if let Some(value) = client_headers.0.get(&name.to_lowercase())
92
1
                && let (Ok(k), Ok(v)) = (
93
1
                    MetadataKey::from_bytes(name.as_bytes()),
94
1
                    MetadataValue::try_from(value.as_str()),
95
                )
96
1
            {
97
1
                request.metadata_mut().insert(k, v);
98
1
            
}0
99
        }
100
4
    }
101
5
    global::get_text_map_propagator(|propagator| {
102
5
        propagator.inject(&mut TonicMetadataInjector(request.metadata_mut()));
103
5
    });
104
5
    request
105
5
}
106
107
// This store is usually a pass-through store, but can also be used as a CAS store. Using it as an
108
// AC store has one major side-effect... The has() function may not give the proper size of the
109
// underlying data. This might cause issues if embedded in certain stores.
110
#[derive(Debug, MetricsComponent)]
111
pub struct GrpcStore {
112
    #[metric(help = "Instance name for the store")]
113
    instance_name: String,
114
    store_type: nativelink_config::stores::StoreType,
115
    retrier: Retrier,
116
    connection_manager: ConnectionManager,
117
    /// Per-RPC timeout. `Duration::ZERO` means disabled.
118
    rpc_timeout: Duration,
119
    use_legacy_resource_names: bool,
120
    headers: Vec<(MetadataKey<Ascii>, MetadataValue<Ascii>)>,
121
    forward_headers: Vec<String>,
122
}
123
124
impl GrpcStore {
125
6
    pub async fn new(spec: &GrpcSpec) -> Result<Arc<Self>, Error> {
126
6
        Self::new_with_jitter(spec, spec.retry.make_jitter_fn()).await
127
6
    }
128
129
6
    pub async fn new_with_jitter(
130
6
        spec: &GrpcSpec,
131
6
        jitter_fn: Arc<dyn Fn(Duration) -> Duration + Send + Sync>,
132
6
    ) -> Result<Arc<Self>, Error> {
133
0
        error_if!(
134
6
            spec.endpoints.is_empty(),
135
            "Expected at least 1 endpoint in GrpcStore"
136
        );
137
6
        let mut endpoints = Vec::with_capacity(spec.endpoints.len());
138
6
        for endpoint_config in &spec.endpoints {
139
6
            let endpoint = tls_utils::endpoint(endpoint_config).map_err(|e| 
{0
140
0
                Error::from_std_err(Code::InvalidArgument, &e)
141
0
                    .append("Invalid URI for GrpcStore endpoint")
142
0
            })?;
143
6
            endpoints.push(endpoint);
144
        }
145
146
6
        let rpc_timeout = Duration::from_secs(spec.rpc_timeout_s);
147
148
6
        let mut headers = Vec::with_capacity(spec.headers.len());
149
6
        for (
name1
,
value1
) in &spec.headers {
150
            // We lowercase keys as HTTP headers are case-insensitive so we should match all cases
151
1
            let key = MetadataKey::from_bytes(name.to_lowercase().as_bytes()).map_err(|_| 
{0
152
0
                make_err!(Code::InvalidArgument, "Invalid gRPC metadata key: {name}")
153
0
            })?;
154
1
            let val = MetadataValue::try_from(value.as_str()).map_err(|_| 
{0
155
0
                make_err!(
156
0
                    Code::InvalidArgument,
157
                    "Invalid gRPC metadata value for key: {name}"
158
                )
159
0
            })?;
160
1
            headers.push((key, val));
161
        }
162
163
6
        Ok(Arc::new(Self {
164
6
            instance_name: spec.instance_name.clone(),
165
6
            store_type: spec.store_type,
166
6
            retrier: Retrier::new(
167
6
                Arc::new(|duration| 
Box::pin0
(
sleep0
(
duration0
))),
168
6
                jitter_fn.clone(),
169
6
                spec.retry.clone(),
170
            ),
171
6
            connection_manager: ConnectionManager::new(
172
6
                endpoints,
173
6
                spec.connections_per_endpoint,
174
6
                spec.max_concurrent_requests,
175
6
                spec.retry.clone(),
176
6
                jitter_fn,
177
            ),
178
6
            rpc_timeout,
179
6
            use_legacy_resource_names: spec.use_legacy_resource_names,
180
6
            headers,
181
            // We lowercase keys as HTTP headers are case-insensitive so we should match all cases
182
6
            forward_headers: spec
183
6
                .forward_headers
184
6
                .iter()
185
6
                .map(|s| 
s1
.
to_lowercase1
())
186
6
                .collect(),
187
        }))
188
6
    }
189
190
0
    async fn perform_request<F, Fut, R, I>(&self, input: I, mut request: F) -> Result<R, Error>
191
0
    where
192
0
        F: FnMut(I) -> Fut + Send + Copy,
193
0
        Fut: Future<Output = Result<R, Error>> + Send,
194
0
        R: Send,
195
0
        I: Send + Clone,
196
0
    {
197
0
        self.retrier
198
0
            .retry(unfold(input, move |input| async move {
199
0
                let input_clone = input.clone();
200
                Some((
201
0
                    request(input_clone)
202
0
                        .await
203
0
                        .map_or_else(RetryResult::Retry, RetryResult::Ok),
204
0
                    input,
205
                ))
206
0
            }))
207
0
            .await
208
0
    }
209
210
1
    pub async fn find_missing_blobs(
211
1
        &self,
212
1
        grpc_request: Request<FindMissingBlobsRequest>,
213
1
    ) -> Result<Response<FindMissingBlobsResponse>, Error> {
214
0
        error_if!(
215
1
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
216
            "CAS operation on AC store"
217
        );
218
219
1
        let mut request = grpc_request.into_inner();
220
221
        // Some builds (Chromium for example) do lots of empty requests for some reason, so shortcut them
222
1
        if request.blob_digests.is_empty() {
223
1
            return Ok(Response::new(FindMissingBlobsResponse {
224
1
                missing_blob_digests: vec![],
225
1
            }));
226
0
        }
227
228
0
        request.instance_name.clone_from(&self.instance_name);
229
0
        self.perform_request(request, |request| async move {
230
0
            let channel = self
231
0
                .connection_manager
232
0
                .connection(format!(
233
0
                    "find_missing_blobs: ({}) {:?}",
234
0
                    request.blob_digests.len(),
235
0
                    request.blob_digests
236
0
                ))
237
0
                .await
238
0
                .err_tip(|| "in find_missing_blobs")?;
239
0
            ContentAddressableStorageClient::new(channel)
240
0
                .find_missing_blobs(enrich_request(
241
0
                    Request::new(request),
242
0
                    &self.headers,
243
0
                    &self.forward_headers,
244
0
                ))
245
0
                .await
246
0
                .err_tip(|| "in GrpcStore::find_missing_blobs")
247
0
        })
248
0
        .await
249
1
    }
250
251
0
    pub async fn batch_update_blobs(
252
0
        &self,
253
0
        grpc_request: Request<BatchUpdateBlobsRequest>,
254
0
    ) -> Result<Response<BatchUpdateBlobsResponse>, Error> {
255
0
        error_if!(
256
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
257
            "CAS operation on AC store"
258
        );
259
260
0
        let mut request = grpc_request.into_inner();
261
0
        request.instance_name.clone_from(&self.instance_name);
262
0
        self.perform_request(request, |request| async move {
263
0
            let channel = self
264
0
                .connection_manager
265
0
                .connection("batch_update_blobs".into())
266
0
                .await
267
0
                .err_tip(|| "in batch_update_blobs")?;
268
0
            ContentAddressableStorageClient::new(channel)
269
0
                .batch_update_blobs(enrich_request(
270
0
                    Request::new(request),
271
0
                    &self.headers,
272
0
                    &self.forward_headers,
273
0
                ))
274
0
                .await
275
0
                .err_tip(|| "in GrpcStore::batch_update_blobs")
276
0
        })
277
0
        .await
278
0
    }
279
280
0
    pub async fn batch_read_blobs(
281
0
        &self,
282
0
        grpc_request: Request<BatchReadBlobsRequest>,
283
0
    ) -> Result<Response<BatchReadBlobsResponse>, Error> {
284
0
        error_if!(
285
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
286
            "CAS operation on AC store"
287
        );
288
289
0
        let mut request = grpc_request.into_inner();
290
0
        request.instance_name.clone_from(&self.instance_name);
291
0
        self.perform_request(request, |request| async move {
292
0
            let channel = self
293
0
                .connection_manager
294
0
                .connection("batch_read_blobs".into())
295
0
                .await
296
0
                .err_tip(|| "in batch_read_blobs")?;
297
0
            ContentAddressableStorageClient::new(channel)
298
0
                .batch_read_blobs(enrich_request(
299
0
                    Request::new(request),
300
0
                    &self.headers,
301
0
                    &self.forward_headers,
302
0
                ))
303
0
                .await
304
0
                .err_tip(|| "in GrpcStore::batch_read_blobs")
305
0
        })
306
0
        .await
307
0
    }
308
309
0
    pub async fn get_tree(
310
0
        &self,
311
0
        grpc_request: Request<GetTreeRequest>,
312
0
    ) -> Result<Response<Streaming<GetTreeResponse>>, Error> {
313
0
        error_if!(
314
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
315
            "CAS operation on AC store"
316
        );
317
318
0
        let mut request = grpc_request.into_inner();
319
0
        request.instance_name.clone_from(&self.instance_name);
320
0
        self.perform_request(request, |request| async move {
321
0
            let channel = self
322
0
                .connection_manager
323
0
                .connection(format!("get_tree: {:?}", request.root_digest))
324
0
                .await
325
0
                .err_tip(|| "in get_tree")?;
326
0
            ContentAddressableStorageClient::new(channel)
327
0
                .get_tree(enrich_request(
328
0
                    Request::new(request),
329
0
                    &self.headers,
330
0
                    &self.forward_headers,
331
0
                ))
332
0
                .await
333
0
                .err_tip(|| "in GrpcStore::get_tree")
334
0
        })
335
0
        .await
336
0
    }
337
338
0
    fn get_read_request(&self, mut request: ReadRequest) -> Result<ReadRequest, Error> {
339
        const IS_UPLOAD_FALSE: bool = false;
340
0
        let mut resource_info = ResourceInfo::new(&request.resource_name, IS_UPLOAD_FALSE)?;
341
0
        if resource_info.instance_name != self.instance_name {
342
0
            resource_info.instance_name = Cow::Borrowed(&self.instance_name);
343
0
            request.resource_name = resource_info.to_string(IS_UPLOAD_FALSE);
344
0
        }
345
0
        Ok(request)
346
0
    }
347
348
3
    async fn read_internal(
349
3
        &self,
350
3
        request: ReadRequest,
351
3
    ) -> Result<impl Stream<Item = Result<ReadResponse, Status>> + use<>, Error> {
352
3
        let channel = self
353
3
            .connection_manager
354
3
            .connection(format!("read_internal: {}", request.resource_name))
355
3
            .await
356
3
            .err_tip(|| "in read_internal")
?0
;
357
3
        let mut response = ByteStreamClient::new(channel)
358
3
            .read(enrich_request(
359
3
                Request::new(request),
360
3
                &self.headers,
361
3
                &self.forward_headers,
362
3
            ))
363
3
            .await
364
3
            .err_tip(|| "in GrpcStore::read")
?0
365
3
            .into_inner();
366
3
        let first_response = response
367
3
            .message()
368
3
            .await
369
3
            .err_tip(|| "Fetching first chunk in GrpcStore::read()")
?0
;
370
3
        Ok(FirstStream::new(first_response, response))
371
3
    }
372
373
0
    pub async fn read<R>(
374
0
        &self,
375
0
        grpc_request: R,
376
0
    ) -> Result<impl Stream<Item = Result<ReadResponse, Status>> + use<R>, Error>
377
0
    where
378
0
        R: IntoRequest<ReadRequest>,
379
0
    {
380
0
        error_if!(
381
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
382
            "CAS operation on AC store"
383
        );
384
385
0
        let request = self.get_read_request(grpc_request.into_request().into_inner())?;
386
0
        self.perform_request(request, |request| async move {
387
0
            self.read_internal(request).await
388
0
        })
389
0
        .await
390
0
    }
391
392
2
    pub async fn write<T, E>(
393
2
        &self,
394
2
        stream: WriteRequestStreamWrapper<T>,
395
2
    ) -> Result<Response<WriteResponse>, Error>
396
2
    where
397
2
        T: Stream<Item = Result<WriteRequest, E>> + Unpin + Send + 'static,
398
2
        E: Into<Error> + 'static,
399
2
    {
400
0
        error_if!(
401
2
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
402
            "CAS operation on AC store"
403
        );
404
405
2
        let local_state = Arc::new(Mutex::new(WriteState::new(
406
2
            self.instance_name.clone(),
407
2
            stream,
408
        )));
409
410
2
        let write_start = std::time::Instant::now();
411
2
        let instance_name = self.instance_name.clone();
412
2
        let rpc_timeout = self.rpc_timeout;
413
2
        trace!(
414
            instance_name = %instance_name,
415
2
            rpc_timeout_s = rpc_timeout.as_secs(),
416
            "GrpcStore::write: starting ByteStream write",
417
        );
418
2
        let mut attempt: u32 = 0;
419
2
        let result = self
420
2
            .retrier
421
2
            .retry(unfold(local_state, move |local_state| {
422
2
                attempt += 1;
423
2
                let instance_name = instance_name.clone();
424
2
                async move {
425
                    // The client write may occur on a separate thread and
426
                    // therefore in order to share the state with it we have to
427
                    // wrap it in a Mutex and retrieve it after the write
428
                    // has completed.  There is no way to get the value back
429
                    // from the client.
430
2
                    trace!(
431
                        instance_name = %instance_name,
432
                        attempt,
433
                        "GrpcStore::write: requesting connection from pool",
434
                    );
435
2
                    let conn_start = std::time::Instant::now();
436
2
                    let rpc_fut = self.connection_manager.connection("write".into()).and_then(
437
2
                        |channel| {
438
2
                            let conn_elapsed = conn_start.elapsed();
439
2
                            let instance_for_rpc = instance_name.clone();
440
2
                            let conn_elapsed_ms =
441
2
                                u64::try_from(conn_elapsed.as_millis()).unwrap_or(u64::MAX);
442
2
                            trace!(
443
                                instance_name = %instance_for_rpc,
444
                                conn_elapsed_ms,
445
                                "GrpcStore::write: got connection, starting ByteStream.Write RPC",
446
                            );
447
2
                            let rpc_start = std::time::Instant::now();
448
2
                            let local_state_for_rpc = local_state.clone();
449
2
                            async move {
450
2
                                let res = ByteStreamClient::new(channel)
451
2
                                    .write(enrich_request(
452
2
                                        Request::new(WriteStateWrapper::new(local_state_for_rpc)),
453
2
                                        &self.headers,
454
2
                                        &self.forward_headers,
455
2
                                    ))
456
2
                                    .await
457
2
                                    .err_tip(|| "in GrpcStore::write");
458
2
                                let rpc_elapsed_ms = u64::try_from(rpc_start.elapsed().as_millis())
459
2
                                    .unwrap_or(u64::MAX);
460
2
                                trace!(
461
                                    instance_name = %instance_for_rpc,
462
                                    rpc_elapsed_ms,
463
2
                                    success = res.is_ok(),
464
                                    "GrpcStore::write: ByteStream.Write RPC returned",
465
                                );
466
2
                                res
467
2
                            }
468
2
                        },
469
                    );
470
471
2
                    let result = if rpc_timeout > Duration::ZERO {
472
2
                        match tokio::time::timeout(rpc_timeout, rpc_fut).await {
473
2
                            Ok(res) => res,
474
0
                            Err(_elapsed) => {
475
0
                                warn!(
476
                                    instance_name = %instance_name,
477
                                    attempt,
478
0
                                    rpc_timeout_s = rpc_timeout.as_secs(),
479
                                    "GrpcStore::write: per-RPC timeout exceeded, cancelling",
480
                                );
481
                                #[allow(unused_qualifications)]
482
0
                                Err(nativelink_error::make_err!(
483
0
                                    nativelink_error::Code::DeadlineExceeded,
484
0
                                    "GrpcStore::write RPC timed out after {}s",
485
0
                                    rpc_timeout.as_secs()
486
0
                                ))
487
                            }
488
                        }
489
                    } else {
490
0
                        rpc_fut.await
491
                    };
492
493
                    // Get the state back from StateWrapper, this should be
494
                    // uncontended since write has returned.
495
2
                    let mut local_state_locked = local_state.lock();
496
497
2
                    let result = local_state_locked
498
2
                        .take_read_stream_error()
499
2
                        .map(|err| RetryResult::Err(
err0
.
append0
("Where read_stream_error was set")))
500
2
                        .unwrap_or_else(|| {
501
                            // No stream error, handle the original result
502
2
                            match result {
503
2
                                Ok(response) => RetryResult::Ok(response),
504
0
                                Err(ref err) => {
505
0
                                    warn!(
506
                                        instance_name = %instance_name,
507
                                        attempt,
508
                                        ?err,
509
0
                                        can_resume = local_state_locked.can_resume(),
510
                                        "GrpcStore::write: RPC failed",
511
                                    );
512
0
                                    if local_state_locked.can_resume() {
513
0
                                        local_state_locked.resume();
514
0
                                        RetryResult::Retry(err.clone())
515
                                    } else {
516
0
                                        RetryResult::Err(
517
0
                                            err.clone().append("Retry is not possible"),
518
0
                                        )
519
                                    }
520
                                }
521
                            }
522
2
                        });
523
524
2
                    drop(local_state_locked);
525
2
                    Some((result, local_state))
526
2
                }
527
2
            }))
528
2
            .await
?0
;
529
530
2
        let total_elapsed_ms = u64::try_from(write_start.elapsed().as_millis()).unwrap_or(u64::MAX);
531
2
        trace!(
532
            instance_name = %self.instance_name,
533
            total_elapsed_ms,
534
            "GrpcStore::write: completed successfully",
535
        );
536
2
        Ok(result)
537
2
    }
538
539
0
    pub async fn query_write_status(
540
0
        &self,
541
0
        grpc_request: Request<QueryWriteStatusRequest>,
542
0
    ) -> Result<Response<QueryWriteStatusResponse>, Error> {
543
        const IS_UPLOAD_TRUE: bool = true;
544
545
0
        error_if!(
546
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
547
            "CAS operation on AC store"
548
        );
549
550
0
        let mut request = grpc_request.into_inner();
551
552
0
        let mut request_info = ResourceInfo::new(&request.resource_name, IS_UPLOAD_TRUE)?;
553
0
        if request_info.instance_name != self.instance_name {
554
0
            request_info.instance_name = Cow::Borrowed(&self.instance_name);
555
0
            request.resource_name = request_info.to_string(IS_UPLOAD_TRUE);
556
0
        }
557
558
0
        self.perform_request(request, |request| async move {
559
0
            let channel = self
560
0
                .connection_manager
561
0
                .connection(format!("query_write_status: {}", request.resource_name))
562
0
                .await
563
0
                .err_tip(|| "in query_write_status")?;
564
0
            ByteStreamClient::new(channel)
565
0
                .query_write_status(enrich_request(
566
0
                    Request::new(request),
567
0
                    &self.headers,
568
0
                    &self.forward_headers,
569
0
                ))
570
0
                .await
571
0
                .err_tip(|| "in GrpcStore::query_write_status")
572
0
        })
573
0
        .await
574
0
    }
575
576
0
    pub async fn get_action_result(
577
0
        &self,
578
0
        grpc_request: Request<GetActionResultRequest>,
579
0
    ) -> Result<Response<ActionResult>, Error> {
580
0
        let mut request = grpc_request.into_inner();
581
0
        request.instance_name.clone_from(&self.instance_name);
582
0
        self.perform_request(request, |request| async move {
583
0
            let channel = self
584
0
                .connection_manager
585
0
                .connection(format!("get_action_result: {:?}", request.action_digest))
586
0
                .await
587
0
                .err_tip(|| "in get_action_result")?;
588
0
            ActionCacheClient::new(channel)
589
0
                .get_action_result(enrich_request(
590
0
                    Request::new(request),
591
0
                    &self.headers,
592
0
                    &self.forward_headers,
593
0
                ))
594
0
                .await
595
0
                .err_tip(|| "in GrpcStore::get_action_result")
596
0
        })
597
0
        .await
598
0
    }
599
600
0
    pub async fn update_action_result(
601
0
        &self,
602
0
        grpc_request: Request<UpdateActionResultRequest>,
603
0
    ) -> Result<Response<ActionResult>, Error> {
604
0
        let mut request = grpc_request.into_inner();
605
0
        request.instance_name.clone_from(&self.instance_name);
606
0
        self.perform_request(request, |request| async move {
607
0
            let channel = self
608
0
                .connection_manager
609
0
                .connection(format!("update_action_result: {:?}", request.action_digest))
610
0
                .await
611
0
                .err_tip(|| "in update_action_result")?;
612
0
            ActionCacheClient::new(channel)
613
0
                .update_action_result(enrich_request(
614
0
                    Request::new(request),
615
0
                    &self.headers,
616
0
                    &self.forward_headers,
617
0
                ))
618
0
                .await
619
0
                .err_tip(|| "in GrpcStore::update_action_result")
620
0
        })
621
0
        .await
622
0
    }
623
624
0
    async fn get_action_result_from_digest(
625
0
        &self,
626
0
        digest: DigestInfo,
627
0
    ) -> Result<Response<ActionResult>, Error> {
628
0
        let action_result_request = GetActionResultRequest {
629
0
            instance_name: self.instance_name.clone(),
630
0
            action_digest: Some(digest.into()),
631
            inline_stdout: false,
632
            inline_stderr: false,
633
0
            inline_output_files: Vec::new(),
634
0
            digest_function: Context::current()
635
0
                .get::<DigestHasherFunc>()
636
0
                .map_or_else(default_digest_hasher_func, |v| *v)
637
0
                .proto_digest_func()
638
0
                .into(),
639
        };
640
0
        self.get_action_result(Request::new(action_result_request))
641
0
            .await
642
0
    }
643
644
0
    async fn get_action_result_as_part(
645
0
        &self,
646
0
        digest: DigestInfo,
647
0
        writer: &mut DropCloserWriteHalf,
648
0
        offset: usize,
649
0
        length: Option<usize>,
650
0
    ) -> Result<(), Error> {
651
0
        let action_result = self
652
0
            .get_action_result_from_digest(digest)
653
0
            .await
654
0
            .map(Response::into_inner)
655
0
            .err_tip(|| "Action result not found")?;
656
        // TODO: Would be better to avoid all the encoding and decoding in this
657
        //       file, however there's no way to currently get raw bytes from a
658
        //       generated prost request unfortunately.
659
0
        let mut value = BytesMut::new();
660
0
        action_result
661
0
            .encode(&mut value)
662
0
            .err_tip(|| "Could not encode upstream action result")?;
663
664
0
        let default_len = value.len() - offset;
665
0
        let length = length.unwrap_or(default_len).min(default_len);
666
0
        if length > 0 {
667
0
            writer
668
0
                .send(value.freeze().slice(offset..offset + length))
669
0
                .await
670
0
                .err_tip(|| "Failed to write data in grpc store")?;
671
0
        }
672
0
        writer
673
0
            .send_eof()
674
0
            .err_tip(|| "Failed to write EOF in grpc store get_action_result_as_part")?;
675
0
        Ok(())
676
0
    }
677
678
0
    async fn update_action_result_from_bytes(
679
0
        &self,
680
0
        digest: DigestInfo,
681
0
        mut reader: DropCloserReadHalf,
682
0
    ) -> Result<(), Error> {
683
0
        let action_result = ActionResult::decode(reader.consume(None).await?)
684
0
            .err_tip(|| "Failed to decode ActionResult in update_action_result_from_bytes")?;
685
0
        let update_action_request = UpdateActionResultRequest {
686
0
            instance_name: self.instance_name.clone(),
687
0
            action_digest: Some(digest.into()),
688
0
            action_result: Some(action_result),
689
0
            results_cache_policy: None,
690
0
            digest_function: Context::current()
691
0
                .get::<DigestHasherFunc>()
692
0
                .map_or_else(default_digest_hasher_func, |v| *v)
693
0
                .proto_digest_func()
694
0
                .into(),
695
        };
696
0
        self.update_action_result(Request::new(update_action_request))
697
0
            .await
698
0
            .map(|_| ())
699
0
    }
700
}
701
702
#[async_trait]
703
impl StoreDriver for GrpcStore {
704
    // NOTE: This function can only be safely used on CAS stores. AC stores may return a size that
705
    // is incorrect.
706
    async fn has_with_results(
707
        self: Pin<&Self>,
708
        keys: &[StoreKey<'_>],
709
        results: &mut [Option<u64>],
710
0
    ) -> Result<(), Error> {
711
        if matches!(self.store_type, nativelink_config::stores::StoreType::Ac) {
712
            keys.iter()
713
                .zip(results.iter_mut())
714
0
                .map(|(key, result)| async move {
715
                    // The length of an AC is incorrect, so we don't figure out the
716
                    // length, instead the biggest possible result is returned in the
717
                    // hope that we detect incorrect usage.
718
0
                    self.get_action_result_from_digest(key.borrow().into_digest())
719
0
                        .await?;
720
0
                    *result = Some(u64::MAX);
721
0
                    Ok::<_, Error>(())
722
0
                })
723
                .collect::<FuturesUnordered<_>>()
724
0
                .try_for_each(|()| future::ready(Ok(())))
725
                .await
726
                .err_tip(|| "Getting upstream action cache entry")?;
727
            return Ok(());
728
        }
729
730
        let missing_blobs_response = self
731
            .find_missing_blobs(Request::new(FindMissingBlobsRequest {
732
                instance_name: self.instance_name.clone(),
733
                blob_digests: keys
734
                    .iter()
735
0
                    .map(|k| k.borrow().into_digest().into())
736
                    .collect(),
737
                digest_function: Context::current()
738
                    .get::<DigestHasherFunc>()
739
                    .map_or_else(default_digest_hasher_func, |v| *v)
740
                    .proto_digest_func()
741
                    .into(),
742
            }))
743
            .await?
744
            .into_inner();
745
746
        // Since the ordering is not guaranteed above, the matching has to check
747
        // all missing blobs against all entries in the unsorted digest list.
748
        // To optimise this, the missing digests are sorted and then it is
749
        // efficient to perform a binary search for each digest within the
750
        // missing list.
751
        let mut missing_digests =
752
            Vec::with_capacity(missing_blobs_response.missing_blob_digests.len());
753
        for missing_digest in missing_blobs_response.missing_blob_digests {
754
            missing_digests.push(DigestInfo::try_from(missing_digest)?);
755
        }
756
        missing_digests.sort_unstable();
757
        for (digest, result) in keys
758
            .iter()
759
0
            .map(|v| v.borrow().into_digest())
760
            .zip(results.iter_mut())
761
        {
762
            match missing_digests.binary_search(&digest) {
763
                Ok(_) => *result = None,
764
                Err(_) => *result = Some(digest.size_bytes()),
765
            }
766
        }
767
768
        Ok(())
769
0
    }
770
771
    async fn update(
772
        self: Pin<&Self>,
773
        key: StoreKey<'_>,
774
        reader: DropCloserReadHalf,
775
        _size_info: UploadSizeInfo,
776
2
    ) -> Result<(), Error> {
777
        struct LocalState {
778
            resource_name: String,
779
            reader: DropCloserReadHalf,
780
            did_error: bool,
781
            bytes_received: i64,
782
        }
783
784
        let digest = key.into_digest();
785
        if matches!(self.store_type, nativelink_config::stores::StoreType::Ac) {
786
            return self.update_action_result_from_bytes(digest, reader).await;
787
        }
788
789
        let mut buf = Uuid::encode_buffer();
790
        let resource_name = if self.use_legacy_resource_names {
791
            format!(
792
                "{}/uploads/{}/blobs/{}/{}",
793
                &self.instance_name,
794
                Uuid::new_v4().hyphenated().encode_lower(&mut buf),
795
                digest.packed_hash(),
796
                digest.size_bytes(),
797
            )
798
        } else {
799
            let digest_function = Context::current()
800
                .get::<DigestHasherFunc>()
801
                .map_or_else(default_digest_hasher_func, |v| *v)
802
                .proto_digest_func()
803
                .as_str_name()
804
                .to_ascii_lowercase();
805
            format!(
806
                "{}/uploads/{}/blobs/{}/{}/{}",
807
                &self.instance_name,
808
                Uuid::new_v4().hyphenated().encode_lower(&mut buf),
809
                digest_function,
810
                digest.packed_hash(),
811
                digest.size_bytes(),
812
            )
813
        };
814
        trace!(
815
            resource_name = %resource_name,
816
            digest_hash = %digest.packed_hash(),
817
            digest_size = digest.size_bytes(),
818
            "GrpcStore::update: starting upload for digest",
819
        );
820
        let local_state = LocalState {
821
            resource_name,
822
            reader,
823
            did_error: false,
824
            bytes_received: 0,
825
        };
826
827
4
        let stream = Box::pin(unfold(local_state, |mut local_state| async move {
828
4
            if local_state.did_error {
829
0
                error!("GrpcStore::update() polled stream after error was returned");
830
0
                return None;
831
4
            }
832
4
            let data = match local_state
833
4
                .reader
834
4
                .recv()
835
4
                .await
836
4
                .err_tip(|| "In GrpcStore::update()")
837
            {
838
4
                Ok(data) => data,
839
0
                Err(err) => {
840
0
                    local_state.did_error = true;
841
0
                    return Some((Err(err), local_state));
842
                }
843
            };
844
845
4
            let write_offset = local_state.bytes_received;
846
4
            local_state.bytes_received += data.len() as i64;
847
848
4
            Some((
849
4
                Ok(WriteRequest {
850
4
                    resource_name: local_state.resource_name.clone(),
851
4
                    write_offset,
852
4
                    finish_write: data.is_empty(), // EOF is when no data was polled.
853
4
                    data,
854
4
                }),
855
4
                local_state,
856
4
            ))
857
8
        }));
858
859
        self.write(
860
            WriteRequestStreamWrapper::from(stream)
861
                .await
862
                .err_tip(|| "in GrpcStore::update()")?,
863
        )
864
        .await
865
        .err_tip(|| "in GrpcStore::update()")?;
866
867
        Ok(())
868
2
    }
869
870
    async fn get_part(
871
        self: Pin<&Self>,
872
        key: StoreKey<'_>,
873
        writer: &mut DropCloserWriteHalf,
874
        offset: u64,
875
        length: Option<u64>,
876
3
    ) -> Result<(), Error> {
877
        struct LocalState<'a> {
878
            resource_name: String,
879
            writer: &'a mut DropCloserWriteHalf,
880
            read_offset: i64,
881
            read_limit: i64,
882
        }
883
884
        let digest = key.into_digest();
885
        if matches!(self.store_type, nativelink_config::stores::StoreType::Ac) {
886
            let offset = usize::try_from(offset).err_tip(|| "Could not convert offset to usize")?;
887
            let length = length
888
0
                .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize"))
889
                .transpose()?;
890
891
            return self
892
                .get_action_result_as_part(digest, writer, offset, length)
893
                .await;
894
        }
895
896
        // Shortcut for empty blobs.
897
        if digest.size_bytes() == 0 {
898
            return writer.send_eof();
899
        }
900
901
        let resource_name = if self.use_legacy_resource_names {
902
            format!(
903
                "{}/blobs/{}/{}",
904
                &self.instance_name,
905
                digest.packed_hash(),
906
                digest.size_bytes(),
907
            )
908
        } else {
909
            let digest_function = Context::current()
910
                .get::<DigestHasherFunc>()
911
                .map_or_else(default_digest_hasher_func, |v| *v)
912
                .proto_digest_func()
913
                .as_str_name()
914
                .to_ascii_lowercase();
915
            format!(
916
                "{}/blobs/{}/{}/{}",
917
                &self.instance_name,
918
                digest_function,
919
                digest.packed_hash(),
920
                digest.size_bytes(),
921
            )
922
        };
923
924
        let local_state = LocalState {
925
            resource_name,
926
            writer,
927
            read_offset: i64::try_from(offset).err_tip(|| "Could not convert offset to i64")?,
928
            read_limit: i64::try_from(length.unwrap_or(0))
929
                .err_tip(|| "Could not convert length to i64")?,
930
        };
931
932
        self.retrier
933
3
            .retry(unfold(local_state, move |mut local_state| async move {
934
3
                let request = ReadRequest {
935
3
                    resource_name: local_state.resource_name.clone(),
936
3
                    read_offset: local_state.read_offset,
937
3
                    read_limit: local_state.read_limit,
938
3
                };
939
3
                let mut stream = match self
940
3
                    .read_internal(request)
941
3
                    .await
942
3
                    .err_tip(|| "in GrpcStore::get_part()")
943
                {
944
3
                    Ok(stream) => stream,
945
0
                    Err(err) => return Some((RetryResult::Retry(err), local_state)),
946
                };
947
948
                loop {
949
6
                    let data = match stream.next().await {
950
                        // Create an empty response to represent EOF.
951
3
                        None => bytes::Bytes::new(),
952
3
                        Some(Ok(message)) => message.data,
953
0
                        Some(Err(status)) => {
954
0
                            return Some((
955
0
                                RetryResult::Retry(
956
0
                                    Into::<Error>::into(status)
957
0
                                        .append("While fetching message in GrpcStore::get_part()"),
958
0
                                ),
959
0
                                local_state,
960
0
                            ));
961
                        }
962
                    };
963
6
                    let length = data.len() as i64;
964
                    // This is the usual exit from the loop at EOF.
965
6
                    if length == 0 {
966
3
                        let eof_result = local_state
967
3
                            .writer
968
3
                            .send_eof()
969
3
                            .err_tip(|| "Could not send eof in GrpcStore::get_part()")
970
3
                            .map_or_else(RetryResult::Err, RetryResult::Ok);
971
3
                        return Some((eof_result, local_state));
972
3
                    }
973
                    // Forward the data upstream.
974
3
                    if let Err(
err0
) = local_state
975
3
                        .writer
976
3
                        .send(data)
977
3
                        .await
978
3
                        .err_tip(|| "While sending in GrpcStore::get_part()")
979
                    {
980
0
                        return Some((RetryResult::Err(err), local_state));
981
3
                    }
982
3
                    local_state.read_offset += length;
983
                }
984
6
            }))
985
            .await
986
3
    }
987
988
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
989
0
        self
990
0
    }
991
992
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
993
0
        self
994
0
    }
995
996
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
997
0
        self
998
0
    }
999
1000
0
    fn register_remove_callback(
1001
0
        self: Arc<Self>,
1002
0
        _callback: Arc<dyn RemoveItemCallback>,
1003
0
    ) -> Result<(), Error> {
1004
0
        Err(Error::new(
1005
0
            Code::Internal,
1006
0
            "gRPC stores are incompatible with removal callbacks".to_string(),
1007
0
        ))
1008
0
    }
1009
}
1010
1011
default_health_status_indicator!(GrpcStore);