Coverage Report

Created: 2026-04-07 13:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/grpc_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use core::time::Duration;
17
use std::borrow::Cow;
18
use std::sync::Arc;
19
20
use async_trait::async_trait;
21
use bytes::BytesMut;
22
use futures::stream::{FuturesUnordered, unfold};
23
use futures::{Future, Stream, StreamExt, TryFutureExt, TryStreamExt, future};
24
use nativelink_config::stores::GrpcSpec;
25
use nativelink_error::{Error, ResultExt, error_if};
26
use nativelink_metric::MetricsComponent;
27
use nativelink_proto::build::bazel::remote::execution::v2::action_cache_client::ActionCacheClient;
28
use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_client::ContentAddressableStorageClient;
29
use nativelink_proto::build::bazel::remote::execution::v2::{
30
    ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest,
31
    BatchUpdateBlobsResponse, FindMissingBlobsRequest, FindMissingBlobsResponse,
32
    GetActionResultRequest, GetTreeRequest, GetTreeResponse, UpdateActionResultRequest,
33
};
34
use nativelink_proto::google::bytestream::byte_stream_client::ByteStreamClient;
35
use nativelink_proto::google::bytestream::{
36
    QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest,
37
    WriteResponse,
38
};
39
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
40
use nativelink_util::common::DigestInfo;
41
use nativelink_util::connection_manager::ConnectionManager;
42
use nativelink_util::digest_hasher::{DigestHasherFunc, default_digest_hasher_func};
43
use nativelink_util::health_utils::HealthStatusIndicator;
44
use nativelink_util::proto_stream_utils::{
45
    FirstStream, WriteRequestStreamWrapper, WriteState, WriteStateWrapper,
46
};
47
use nativelink_util::resource_info::ResourceInfo;
48
use nativelink_util::retry::{Retrier, RetryResult};
49
use nativelink_util::store_trait::{RemoveItemCallback, StoreDriver, StoreKey, UploadSizeInfo};
50
use nativelink_util::{default_health_status_indicator, tls_utils};
51
use opentelemetry::context::Context;
52
use parking_lot::Mutex;
53
use prost::Message;
54
use tokio::time::sleep;
55
use tonic::{Code, IntoRequest, Request, Response, Status, Streaming};
56
use tracing::{error, trace, warn};
57
use uuid::Uuid;
58
59
// This store is usually a pass-through store, but can also be used as a CAS store. Using it as an
60
// AC store has one major side-effect... The has() function may not give the proper size of the
61
// underlying data. This might cause issues if embedded in certain stores.
62
#[derive(Debug, MetricsComponent)]
63
pub struct GrpcStore {
64
    #[metric(help = "Instance name for the store")]
65
    instance_name: String,
66
    store_type: nativelink_config::stores::StoreType,
67
    retrier: Retrier,
68
    connection_manager: ConnectionManager,
69
    /// Per-RPC timeout. `Duration::ZERO` means disabled.
70
    rpc_timeout: Duration,
71
}
72
73
impl GrpcStore {
74
1
    pub async fn new(spec: &GrpcSpec) -> Result<Arc<Self>, Error> {
75
1
        Self::new_with_jitter(spec, spec.retry.make_jitter_fn()).await
76
1
    }
77
78
1
    pub async fn new_with_jitter(
79
1
        spec: &GrpcSpec,
80
1
        jitter_fn: Arc<dyn Fn(Duration) -> Duration + Send + Sync>,
81
1
    ) -> Result<Arc<Self>, Error> {
82
0
        error_if!(
83
1
            spec.endpoints.is_empty(),
84
            "Expected at least 1 endpoint in GrpcStore"
85
        );
86
1
        let mut endpoints = Vec::with_capacity(spec.endpoints.len());
87
1
        for endpoint_config in &spec.endpoints {
88
1
            let endpoint = tls_utils::endpoint(endpoint_config).map_err(|e| 
{0
89
0
                Error::from_std_err(Code::InvalidArgument, &e)
90
0
                    .append("Invalid URI for GrpcStore endpoint")
91
0
            })?;
92
1
            endpoints.push(endpoint);
93
        }
94
95
1
        let rpc_timeout = Duration::from_secs(spec.rpc_timeout_s);
96
97
1
        Ok(Arc::new(Self {
98
1
            instance_name: spec.instance_name.clone(),
99
1
            store_type: spec.store_type,
100
1
            retrier: Retrier::new(
101
1
                Arc::new(|duration| 
Box::pin0
(
sleep0
(
duration0
))),
102
1
                jitter_fn.clone(),
103
1
                spec.retry.clone(),
104
            ),
105
1
            connection_manager: ConnectionManager::new(
106
1
                endpoints,
107
1
                spec.connections_per_endpoint,
108
1
                spec.max_concurrent_requests,
109
1
                spec.retry.clone(),
110
1
                jitter_fn,
111
            ),
112
1
            rpc_timeout,
113
        }))
114
1
    }
115
116
0
    async fn perform_request<F, Fut, R, I>(&self, input: I, mut request: F) -> Result<R, Error>
117
0
    where
118
0
        F: FnMut(I) -> Fut + Send + Copy,
119
0
        Fut: Future<Output = Result<R, Error>> + Send,
120
0
        R: Send,
121
0
        I: Send + Clone,
122
0
    {
123
0
        self.retrier
124
0
            .retry(unfold(input, move |input| async move {
125
0
                let input_clone = input.clone();
126
                Some((
127
0
                    request(input_clone)
128
0
                        .await
129
0
                        .map_or_else(RetryResult::Retry, RetryResult::Ok),
130
0
                    input,
131
                ))
132
0
            }))
133
0
            .await
134
0
    }
135
136
1
    pub async fn find_missing_blobs(
137
1
        &self,
138
1
        grpc_request: Request<FindMissingBlobsRequest>,
139
1
    ) -> Result<Response<FindMissingBlobsResponse>, Error> {
140
0
        error_if!(
141
1
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
142
            "CAS operation on AC store"
143
        );
144
145
1
        let mut request = grpc_request.into_inner();
146
147
        // Some builds (Chromium for example) do lots of empty requests for some reason, so shortcut them
148
1
        if request.blob_digests.is_empty() {
149
1
            return Ok(Response::new(FindMissingBlobsResponse {
150
1
                missing_blob_digests: vec![],
151
1
            }));
152
0
        }
153
154
0
        request.instance_name.clone_from(&self.instance_name);
155
0
        self.perform_request(request, |request| async move {
156
0
            let channel = self
157
0
                .connection_manager
158
0
                .connection(format!(
159
0
                    "find_missing_blobs: ({}) {:?}",
160
0
                    request.blob_digests.len(),
161
0
                    request.blob_digests
162
0
                ))
163
0
                .await
164
0
                .err_tip(|| "in find_missing_blobs")?;
165
0
            ContentAddressableStorageClient::new(channel)
166
0
                .find_missing_blobs(Request::new(request))
167
0
                .await
168
0
                .err_tip(|| "in GrpcStore::find_missing_blobs")
169
0
        })
170
0
        .await
171
1
    }
172
173
0
    pub async fn batch_update_blobs(
174
0
        &self,
175
0
        grpc_request: Request<BatchUpdateBlobsRequest>,
176
0
    ) -> Result<Response<BatchUpdateBlobsResponse>, Error> {
177
0
        error_if!(
178
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
179
            "CAS operation on AC store"
180
        );
181
182
0
        let mut request = grpc_request.into_inner();
183
0
        request.instance_name.clone_from(&self.instance_name);
184
0
        self.perform_request(request, |request| async move {
185
0
            let channel = self
186
0
                .connection_manager
187
0
                .connection("batch_update_blobs".into())
188
0
                .await
189
0
                .err_tip(|| "in batch_update_blobs")?;
190
0
            ContentAddressableStorageClient::new(channel)
191
0
                .batch_update_blobs(Request::new(request))
192
0
                .await
193
0
                .err_tip(|| "in GrpcStore::batch_update_blobs")
194
0
        })
195
0
        .await
196
0
    }
197
198
0
    pub async fn batch_read_blobs(
199
0
        &self,
200
0
        grpc_request: Request<BatchReadBlobsRequest>,
201
0
    ) -> Result<Response<BatchReadBlobsResponse>, Error> {
202
0
        error_if!(
203
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
204
            "CAS operation on AC store"
205
        );
206
207
0
        let mut request = grpc_request.into_inner();
208
0
        request.instance_name.clone_from(&self.instance_name);
209
0
        self.perform_request(request, |request| async move {
210
0
            let channel = self
211
0
                .connection_manager
212
0
                .connection("batch_read_blobs".into())
213
0
                .await
214
0
                .err_tip(|| "in batch_read_blobs")?;
215
0
            ContentAddressableStorageClient::new(channel)
216
0
                .batch_read_blobs(Request::new(request))
217
0
                .await
218
0
                .err_tip(|| "in GrpcStore::batch_read_blobs")
219
0
        })
220
0
        .await
221
0
    }
222
223
0
    pub async fn get_tree(
224
0
        &self,
225
0
        grpc_request: Request<GetTreeRequest>,
226
0
    ) -> Result<Response<Streaming<GetTreeResponse>>, Error> {
227
0
        error_if!(
228
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
229
            "CAS operation on AC store"
230
        );
231
232
0
        let mut request = grpc_request.into_inner();
233
0
        request.instance_name.clone_from(&self.instance_name);
234
0
        self.perform_request(request, |request| async move {
235
0
            let channel = self
236
0
                .connection_manager
237
0
                .connection(format!("get_tree: {:?}", request.root_digest))
238
0
                .await
239
0
                .err_tip(|| "in get_tree")?;
240
0
            ContentAddressableStorageClient::new(channel)
241
0
                .get_tree(Request::new(request))
242
0
                .await
243
0
                .err_tip(|| "in GrpcStore::get_tree")
244
0
        })
245
0
        .await
246
0
    }
247
248
0
    fn get_read_request(&self, mut request: ReadRequest) -> Result<ReadRequest, Error> {
249
        const IS_UPLOAD_FALSE: bool = false;
250
0
        let mut resource_info = ResourceInfo::new(&request.resource_name, IS_UPLOAD_FALSE)?;
251
0
        if resource_info.instance_name != self.instance_name {
252
0
            resource_info.instance_name = Cow::Borrowed(&self.instance_name);
253
0
            request.resource_name = resource_info.to_string(IS_UPLOAD_FALSE);
254
0
        }
255
0
        Ok(request)
256
0
    }
257
258
0
    async fn read_internal(
259
0
        &self,
260
0
        request: ReadRequest,
261
0
    ) -> Result<impl Stream<Item = Result<ReadResponse, Status>> + use<>, Error> {
262
0
        let channel = self
263
0
            .connection_manager
264
0
            .connection(format!("read_internal: {}", request.resource_name))
265
0
            .await
266
0
            .err_tip(|| "in read_internal")?;
267
0
        let mut response = ByteStreamClient::new(channel)
268
0
            .read(Request::new(request))
269
0
            .await
270
0
            .err_tip(|| "in GrpcStore::read")?
271
0
            .into_inner();
272
0
        let first_response = response
273
0
            .message()
274
0
            .await
275
0
            .err_tip(|| "Fetching first chunk in GrpcStore::read()")?;
276
0
        Ok(FirstStream::new(first_response, response))
277
0
    }
278
279
0
    pub async fn read<R>(
280
0
        &self,
281
0
        grpc_request: R,
282
0
    ) -> Result<impl Stream<Item = Result<ReadResponse, Status>> + use<R>, Error>
283
0
    where
284
0
        R: IntoRequest<ReadRequest>,
285
0
    {
286
0
        error_if!(
287
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
288
            "CAS operation on AC store"
289
        );
290
291
0
        let request = self.get_read_request(grpc_request.into_request().into_inner())?;
292
0
        self.perform_request(request, |request| async move {
293
0
            self.read_internal(request).await
294
0
        })
295
0
        .await
296
0
    }
297
298
0
    pub async fn write<T, E>(
299
0
        &self,
300
0
        stream: WriteRequestStreamWrapper<T>,
301
0
    ) -> Result<Response<WriteResponse>, Error>
302
0
    where
303
0
        T: Stream<Item = Result<WriteRequest, E>> + Unpin + Send + 'static,
304
0
        E: Into<Error> + 'static,
305
0
    {
306
0
        error_if!(
307
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
308
            "CAS operation on AC store"
309
        );
310
311
0
        let local_state = Arc::new(Mutex::new(WriteState::new(
312
0
            self.instance_name.clone(),
313
0
            stream,
314
        )));
315
316
0
        let write_start = std::time::Instant::now();
317
0
        let instance_name = self.instance_name.clone();
318
0
        let rpc_timeout = self.rpc_timeout;
319
0
        trace!(
320
            instance_name = %instance_name,
321
0
            rpc_timeout_s = rpc_timeout.as_secs(),
322
            "GrpcStore::write: starting ByteStream write",
323
        );
324
0
        let mut attempt: u32 = 0;
325
0
        let result = self
326
0
            .retrier
327
0
            .retry(unfold(local_state, move |local_state| {
328
0
                attempt += 1;
329
0
                let instance_name = instance_name.clone();
330
0
                async move {
331
                    // The client write may occur on a separate thread and
332
                    // therefore in order to share the state with it we have to
333
                    // wrap it in a Mutex and retrieve it after the write
334
                    // has completed.  There is no way to get the value back
335
                    // from the client.
336
0
                    trace!(
337
                        instance_name = %instance_name,
338
                        attempt,
339
                        "GrpcStore::write: requesting connection from pool",
340
                    );
341
0
                    let conn_start = std::time::Instant::now();
342
0
                    let rpc_fut = self.connection_manager.connection("write".into()).and_then(
343
0
                        |channel| {
344
0
                            let conn_elapsed = conn_start.elapsed();
345
0
                            let instance_for_rpc = instance_name.clone();
346
0
                            let conn_elapsed_ms =
347
0
                                u64::try_from(conn_elapsed.as_millis()).unwrap_or(u64::MAX);
348
0
                            trace!(
349
                                instance_name = %instance_for_rpc,
350
                                conn_elapsed_ms,
351
                                "GrpcStore::write: got connection, starting ByteStream.Write RPC",
352
                            );
353
0
                            let rpc_start = std::time::Instant::now();
354
0
                            let local_state_for_rpc = local_state.clone();
355
0
                            async move {
356
0
                                let res = ByteStreamClient::new(channel)
357
0
                                    .write(WriteStateWrapper::new(local_state_for_rpc))
358
0
                                    .await
359
0
                                    .err_tip(|| "in GrpcStore::write");
360
0
                                let rpc_elapsed_ms = u64::try_from(rpc_start.elapsed().as_millis())
361
0
                                    .unwrap_or(u64::MAX);
362
0
                                trace!(
363
                                    instance_name = %instance_for_rpc,
364
                                    rpc_elapsed_ms,
365
0
                                    success = res.is_ok(),
366
                                    "GrpcStore::write: ByteStream.Write RPC returned",
367
                                );
368
0
                                res
369
0
                            }
370
0
                        },
371
                    );
372
373
0
                    let result = if rpc_timeout > Duration::ZERO {
374
0
                        match tokio::time::timeout(rpc_timeout, rpc_fut).await {
375
0
                            Ok(res) => res,
376
0
                            Err(_elapsed) => {
377
0
                                warn!(
378
                                    instance_name = %instance_name,
379
                                    attempt,
380
0
                                    rpc_timeout_s = rpc_timeout.as_secs(),
381
                                    "GrpcStore::write: per-RPC timeout exceeded, cancelling",
382
                                );
383
                                #[allow(unused_qualifications)]
384
0
                                Err(nativelink_error::make_err!(
385
0
                                    nativelink_error::Code::DeadlineExceeded,
386
0
                                    "GrpcStore::write RPC timed out after {}s",
387
0
                                    rpc_timeout.as_secs()
388
0
                                ))
389
                            }
390
                        }
391
                    } else {
392
0
                        rpc_fut.await
393
                    };
394
395
                    // Get the state back from StateWrapper, this should be
396
                    // uncontended since write has returned.
397
0
                    let mut local_state_locked = local_state.lock();
398
399
0
                    let result = local_state_locked
400
0
                        .take_read_stream_error()
401
0
                        .map(|err| RetryResult::Err(err.append("Where read_stream_error was set")))
402
0
                        .unwrap_or_else(|| {
403
                            // No stream error, handle the original result
404
0
                            match result {
405
0
                                Ok(response) => RetryResult::Ok(response),
406
0
                                Err(ref err) => {
407
0
                                    warn!(
408
                                        instance_name = %instance_name,
409
                                        attempt,
410
                                        ?err,
411
0
                                        can_resume = local_state_locked.can_resume(),
412
                                        "GrpcStore::write: RPC failed",
413
                                    );
414
0
                                    if local_state_locked.can_resume() {
415
0
                                        local_state_locked.resume();
416
0
                                        RetryResult::Retry(err.clone())
417
                                    } else {
418
0
                                        RetryResult::Err(
419
0
                                            err.clone().append("Retry is not possible"),
420
0
                                        )
421
                                    }
422
                                }
423
                            }
424
0
                        });
425
426
0
                    drop(local_state_locked);
427
0
                    Some((result, local_state))
428
0
                }
429
0
            }))
430
0
            .await?;
431
432
0
        let total_elapsed_ms = u64::try_from(write_start.elapsed().as_millis()).unwrap_or(u64::MAX);
433
0
        trace!(
434
            instance_name = %self.instance_name,
435
            total_elapsed_ms,
436
            "GrpcStore::write: completed successfully",
437
        );
438
0
        Ok(result)
439
0
    }
440
441
0
    pub async fn query_write_status(
442
0
        &self,
443
0
        grpc_request: Request<QueryWriteStatusRequest>,
444
0
    ) -> Result<Response<QueryWriteStatusResponse>, Error> {
445
        const IS_UPLOAD_TRUE: bool = true;
446
447
0
        error_if!(
448
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
449
            "CAS operation on AC store"
450
        );
451
452
0
        let mut request = grpc_request.into_inner();
453
454
0
        let mut request_info = ResourceInfo::new(&request.resource_name, IS_UPLOAD_TRUE)?;
455
0
        if request_info.instance_name != self.instance_name {
456
0
            request_info.instance_name = Cow::Borrowed(&self.instance_name);
457
0
            request.resource_name = request_info.to_string(IS_UPLOAD_TRUE);
458
0
        }
459
460
0
        self.perform_request(request, |request| async move {
461
0
            let channel = self
462
0
                .connection_manager
463
0
                .connection(format!("query_write_status: {}", request.resource_name))
464
0
                .await
465
0
                .err_tip(|| "in query_write_status")?;
466
0
            ByteStreamClient::new(channel)
467
0
                .query_write_status(Request::new(request))
468
0
                .await
469
0
                .err_tip(|| "in GrpcStore::query_write_status")
470
0
        })
471
0
        .await
472
0
    }
473
474
0
    pub async fn get_action_result(
475
0
        &self,
476
0
        grpc_request: Request<GetActionResultRequest>,
477
0
    ) -> Result<Response<ActionResult>, Error> {
478
0
        let mut request = grpc_request.into_inner();
479
0
        request.instance_name.clone_from(&self.instance_name);
480
0
        self.perform_request(request, |request| async move {
481
0
            let channel = self
482
0
                .connection_manager
483
0
                .connection(format!("get_action_result: {:?}", request.action_digest))
484
0
                .await
485
0
                .err_tip(|| "in get_action_result")?;
486
0
            ActionCacheClient::new(channel)
487
0
                .get_action_result(Request::new(request))
488
0
                .await
489
0
                .err_tip(|| "in GrpcStore::get_action_result")
490
0
        })
491
0
        .await
492
0
    }
493
494
0
    pub async fn update_action_result(
495
0
        &self,
496
0
        grpc_request: Request<UpdateActionResultRequest>,
497
0
    ) -> Result<Response<ActionResult>, Error> {
498
0
        let mut request = grpc_request.into_inner();
499
0
        request.instance_name.clone_from(&self.instance_name);
500
0
        self.perform_request(request, |request| async move {
501
0
            let channel = self
502
0
                .connection_manager
503
0
                .connection(format!("update_action_result: {:?}", request.action_digest))
504
0
                .await
505
0
                .err_tip(|| "in update_action_result")?;
506
0
            ActionCacheClient::new(channel)
507
0
                .update_action_result(Request::new(request))
508
0
                .await
509
0
                .err_tip(|| "in GrpcStore::update_action_result")
510
0
        })
511
0
        .await
512
0
    }
513
514
0
    async fn get_action_result_from_digest(
515
0
        &self,
516
0
        digest: DigestInfo,
517
0
    ) -> Result<Response<ActionResult>, Error> {
518
0
        let action_result_request = GetActionResultRequest {
519
0
            instance_name: self.instance_name.clone(),
520
0
            action_digest: Some(digest.into()),
521
            inline_stdout: false,
522
            inline_stderr: false,
523
0
            inline_output_files: Vec::new(),
524
0
            digest_function: Context::current()
525
0
                .get::<DigestHasherFunc>()
526
0
                .map_or_else(default_digest_hasher_func, |v| *v)
527
0
                .proto_digest_func()
528
0
                .into(),
529
        };
530
0
        self.get_action_result(Request::new(action_result_request))
531
0
            .await
532
0
    }
533
534
0
    async fn get_action_result_as_part(
535
0
        &self,
536
0
        digest: DigestInfo,
537
0
        writer: &mut DropCloserWriteHalf,
538
0
        offset: usize,
539
0
        length: Option<usize>,
540
0
    ) -> Result<(), Error> {
541
0
        let action_result = self
542
0
            .get_action_result_from_digest(digest)
543
0
            .await
544
0
            .map(Response::into_inner)
545
0
            .err_tip(|| "Action result not found")?;
546
        // TODO: Would be better to avoid all the encoding and decoding in this
547
        //       file, however there's no way to currently get raw bytes from a
548
        //       generated prost request unfortunately.
549
0
        let mut value = BytesMut::new();
550
0
        action_result
551
0
            .encode(&mut value)
552
0
            .err_tip(|| "Could not encode upstream action result")?;
553
554
0
        let default_len = value.len() - offset;
555
0
        let length = length.unwrap_or(default_len).min(default_len);
556
0
        if length > 0 {
557
0
            writer
558
0
                .send(value.freeze().slice(offset..offset + length))
559
0
                .await
560
0
                .err_tip(|| "Failed to write data in grpc store")?;
561
0
        }
562
0
        writer
563
0
            .send_eof()
564
0
            .err_tip(|| "Failed to write EOF in grpc store get_action_result_as_part")?;
565
0
        Ok(())
566
0
    }
567
568
0
    async fn update_action_result_from_bytes(
569
0
        &self,
570
0
        digest: DigestInfo,
571
0
        mut reader: DropCloserReadHalf,
572
0
    ) -> Result<(), Error> {
573
0
        let action_result = ActionResult::decode(reader.consume(None).await?)
574
0
            .err_tip(|| "Failed to decode ActionResult in update_action_result_from_bytes")?;
575
0
        let update_action_request = UpdateActionResultRequest {
576
0
            instance_name: self.instance_name.clone(),
577
0
            action_digest: Some(digest.into()),
578
0
            action_result: Some(action_result),
579
0
            results_cache_policy: None,
580
0
            digest_function: Context::current()
581
0
                .get::<DigestHasherFunc>()
582
0
                .map_or_else(default_digest_hasher_func, |v| *v)
583
0
                .proto_digest_func()
584
0
                .into(),
585
        };
586
0
        self.update_action_result(Request::new(update_action_request))
587
0
            .await
588
0
            .map(|_| ())
589
0
    }
590
}
591
592
#[async_trait]
593
impl StoreDriver for GrpcStore {
594
    // NOTE: This function can only be safely used on CAS stores. AC stores may return a size that
595
    // is incorrect.
596
    async fn has_with_results(
597
        self: Pin<&Self>,
598
        keys: &[StoreKey<'_>],
599
        results: &mut [Option<u64>],
600
0
    ) -> Result<(), Error> {
601
        if matches!(self.store_type, nativelink_config::stores::StoreType::Ac) {
602
            keys.iter()
603
                .zip(results.iter_mut())
604
0
                .map(|(key, result)| async move {
605
                    // The length of an AC is incorrect, so we don't figure out the
606
                    // length, instead the biggest possible result is returned in the
607
                    // hope that we detect incorrect usage.
608
0
                    self.get_action_result_from_digest(key.borrow().into_digest())
609
0
                        .await?;
610
0
                    *result = Some(u64::MAX);
611
0
                    Ok::<_, Error>(())
612
0
                })
613
                .collect::<FuturesUnordered<_>>()
614
0
                .try_for_each(|()| future::ready(Ok(())))
615
                .await
616
                .err_tip(|| "Getting upstream action cache entry")?;
617
            return Ok(());
618
        }
619
620
        let missing_blobs_response = self
621
            .find_missing_blobs(Request::new(FindMissingBlobsRequest {
622
                instance_name: self.instance_name.clone(),
623
                blob_digests: keys
624
                    .iter()
625
0
                    .map(|k| k.borrow().into_digest().into())
626
                    .collect(),
627
                digest_function: Context::current()
628
                    .get::<DigestHasherFunc>()
629
                    .map_or_else(default_digest_hasher_func, |v| *v)
630
                    .proto_digest_func()
631
                    .into(),
632
            }))
633
            .await?
634
            .into_inner();
635
636
        // Since the ordering is not guaranteed above, the matching has to check
637
        // all missing blobs against all entries in the unsorted digest list.
638
        // To optimise this, the missing digests are sorted and then it is
639
        // efficient to perform a binary search for each digest within the
640
        // missing list.
641
        let mut missing_digests =
642
            Vec::with_capacity(missing_blobs_response.missing_blob_digests.len());
643
        for missing_digest in missing_blobs_response.missing_blob_digests {
644
            missing_digests.push(DigestInfo::try_from(missing_digest)?);
645
        }
646
        missing_digests.sort_unstable();
647
        for (digest, result) in keys
648
            .iter()
649
0
            .map(|v| v.borrow().into_digest())
650
            .zip(results.iter_mut())
651
        {
652
            match missing_digests.binary_search(&digest) {
653
                Ok(_) => *result = None,
654
                Err(_) => *result = Some(digest.size_bytes()),
655
            }
656
        }
657
658
        Ok(())
659
0
    }
660
661
    async fn update(
662
        self: Pin<&Self>,
663
        key: StoreKey<'_>,
664
        reader: DropCloserReadHalf,
665
        _size_info: UploadSizeInfo,
666
0
    ) -> Result<(), Error> {
667
        struct LocalState {
668
            resource_name: String,
669
            reader: DropCloserReadHalf,
670
            did_error: bool,
671
            bytes_received: i64,
672
        }
673
674
        let digest = key.into_digest();
675
        if matches!(self.store_type, nativelink_config::stores::StoreType::Ac) {
676
            return self.update_action_result_from_bytes(digest, reader).await;
677
        }
678
679
        let digest_function = Context::current()
680
            .get::<DigestHasherFunc>()
681
            .map_or_else(default_digest_hasher_func, |v| *v)
682
            .proto_digest_func()
683
            .as_str_name()
684
            .to_ascii_lowercase();
685
686
        let mut buf = Uuid::encode_buffer();
687
        let resource_name = format!(
688
            "{}/uploads/{}/blobs/{}/{}/{}",
689
            &self.instance_name,
690
            Uuid::new_v4().hyphenated().encode_lower(&mut buf),
691
            digest_function,
692
            digest.packed_hash(),
693
            digest.size_bytes(),
694
        );
695
        trace!(
696
            resource_name = %resource_name,
697
            digest_hash = %digest.packed_hash(),
698
            digest_size = digest.size_bytes(),
699
            "GrpcStore::update: starting upload for digest",
700
        );
701
        let local_state = LocalState {
702
            resource_name,
703
            reader,
704
            did_error: false,
705
            bytes_received: 0,
706
        };
707
708
0
        let stream = Box::pin(unfold(local_state, |mut local_state| async move {
709
0
            if local_state.did_error {
710
0
                error!("GrpcStore::update() polled stream after error was returned");
711
0
                return None;
712
0
            }
713
0
            let data = match local_state
714
0
                .reader
715
0
                .recv()
716
0
                .await
717
0
                .err_tip(|| "In GrpcStore::update()")
718
            {
719
0
                Ok(data) => data,
720
0
                Err(err) => {
721
0
                    local_state.did_error = true;
722
0
                    return Some((Err(err), local_state));
723
                }
724
            };
725
726
0
            let write_offset = local_state.bytes_received;
727
0
            local_state.bytes_received += data.len() as i64;
728
729
0
            Some((
730
0
                Ok(WriteRequest {
731
0
                    resource_name: local_state.resource_name.clone(),
732
0
                    write_offset,
733
0
                    finish_write: data.is_empty(), // EOF is when no data was polled.
734
0
                    data,
735
0
                }),
736
0
                local_state,
737
0
            ))
738
0
        }));
739
740
        self.write(
741
            WriteRequestStreamWrapper::from(stream)
742
                .await
743
                .err_tip(|| "in GrpcStore::update()")?,
744
        )
745
        .await
746
        .err_tip(|| "in GrpcStore::update()")?;
747
748
        Ok(())
749
0
    }
750
751
    async fn get_part(
752
        self: Pin<&Self>,
753
        key: StoreKey<'_>,
754
        writer: &mut DropCloserWriteHalf,
755
        offset: u64,
756
        length: Option<u64>,
757
0
    ) -> Result<(), Error> {
758
        struct LocalState<'a> {
759
            resource_name: String,
760
            writer: &'a mut DropCloserWriteHalf,
761
            read_offset: i64,
762
            read_limit: i64,
763
        }
764
765
        let digest = key.into_digest();
766
        if matches!(self.store_type, nativelink_config::stores::StoreType::Ac) {
767
            let offset = usize::try_from(offset).err_tip(|| "Could not convert offset to usize")?;
768
            let length = length
769
0
                .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize"))
770
                .transpose()?;
771
772
            return self
773
                .get_action_result_as_part(digest, writer, offset, length)
774
                .await;
775
        }
776
777
        // Shortcut for empty blobs.
778
        if digest.size_bytes() == 0 {
779
            return writer.send_eof();
780
        }
781
782
        let digest_function = Context::current()
783
            .get::<DigestHasherFunc>()
784
            .map_or_else(default_digest_hasher_func, |v| *v)
785
            .proto_digest_func()
786
            .as_str_name()
787
            .to_ascii_lowercase();
788
789
        let resource_name = format!(
790
            "{}/blobs/{}/{}/{}",
791
            &self.instance_name,
792
            digest_function,
793
            digest.packed_hash(),
794
            digest.size_bytes(),
795
        );
796
797
        let local_state = LocalState {
798
            resource_name,
799
            writer,
800
            read_offset: i64::try_from(offset).err_tip(|| "Could not convert offset to i64")?,
801
            read_limit: i64::try_from(length.unwrap_or(0))
802
                .err_tip(|| "Could not convert length to i64")?,
803
        };
804
805
        self.retrier
806
0
            .retry(unfold(local_state, move |mut local_state| async move {
807
0
                let request = ReadRequest {
808
0
                    resource_name: local_state.resource_name.clone(),
809
0
                    read_offset: local_state.read_offset,
810
0
                    read_limit: local_state.read_limit,
811
0
                };
812
0
                let mut stream = match self
813
0
                    .read_internal(request)
814
0
                    .await
815
0
                    .err_tip(|| "in GrpcStore::get_part()")
816
                {
817
0
                    Ok(stream) => stream,
818
0
                    Err(err) => return Some((RetryResult::Retry(err), local_state)),
819
                };
820
821
                loop {
822
0
                    let data = match stream.next().await {
823
                        // Create an empty response to represent EOF.
824
0
                        None => bytes::Bytes::new(),
825
0
                        Some(Ok(message)) => message.data,
826
0
                        Some(Err(status)) => {
827
0
                            return Some((
828
0
                                RetryResult::Retry(
829
0
                                    Into::<Error>::into(status)
830
0
                                        .append("While fetching message in GrpcStore::get_part()"),
831
0
                                ),
832
0
                                local_state,
833
0
                            ));
834
                        }
835
                    };
836
0
                    let length = data.len() as i64;
837
                    // This is the usual exit from the loop at EOF.
838
0
                    if length == 0 {
839
0
                        let eof_result = local_state
840
0
                            .writer
841
0
                            .send_eof()
842
0
                            .err_tip(|| "Could not send eof in GrpcStore::get_part()")
843
0
                            .map_or_else(RetryResult::Err, RetryResult::Ok);
844
0
                        return Some((eof_result, local_state));
845
0
                    }
846
                    // Forward the data upstream.
847
0
                    if let Err(err) = local_state
848
0
                        .writer
849
0
                        .send(data)
850
0
                        .await
851
0
                        .err_tip(|| "While sending in GrpcStore::get_part()")
852
                    {
853
0
                        return Some((RetryResult::Err(err), local_state));
854
0
                    }
855
0
                    local_state.read_offset += length;
856
                }
857
0
            }))
858
            .await
859
0
    }
860
861
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
862
0
        self
863
0
    }
864
865
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
866
0
        self
867
0
    }
868
869
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
870
0
        self
871
0
    }
872
873
0
    fn register_remove_callback(
874
0
        self: Arc<Self>,
875
0
        _callback: Arc<dyn RemoveItemCallback>,
876
0
    ) -> Result<(), Error> {
877
0
        Err(Error::new(
878
0
            Code::Internal,
879
0
            "gRPC stores are incompatible with removal callbacks".to_string(),
880
0
        ))
881
0
    }
882
}
883
884
default_health_status_indicator!(GrpcStore);