Coverage Report

Created: 2025-09-16 19:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/grpc_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use core::time::Duration;
17
use std::borrow::Cow;
18
use std::sync::Arc;
19
20
use async_trait::async_trait;
21
use bytes::BytesMut;
22
use futures::stream::{FuturesUnordered, unfold};
23
use futures::{Future, Stream, StreamExt, TryFutureExt, TryStreamExt, future};
24
use nativelink_config::stores::GrpcSpec;
25
use nativelink_error::{Error, ResultExt, error_if, make_input_err};
26
use nativelink_metric::MetricsComponent;
27
use nativelink_proto::build::bazel::remote::execution::v2::action_cache_client::ActionCacheClient;
28
use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_client::ContentAddressableStorageClient;
29
use nativelink_proto::build::bazel::remote::execution::v2::{
30
    ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest,
31
    BatchUpdateBlobsResponse, FindMissingBlobsRequest, FindMissingBlobsResponse,
32
    GetActionResultRequest, GetTreeRequest, GetTreeResponse, UpdateActionResultRequest,
33
};
34
use nativelink_proto::google::bytestream::byte_stream_client::ByteStreamClient;
35
use nativelink_proto::google::bytestream::{
36
    QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest,
37
    WriteResponse,
38
};
39
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
40
use nativelink_util::common::DigestInfo;
41
use nativelink_util::connection_manager::ConnectionManager;
42
use nativelink_util::digest_hasher::{DigestHasherFunc, default_digest_hasher_func};
43
use nativelink_util::health_utils::HealthStatusIndicator;
44
use nativelink_util::proto_stream_utils::{
45
    FirstStream, WriteRequestStreamWrapper, WriteState, WriteStateWrapper,
46
};
47
use nativelink_util::resource_info::ResourceInfo;
48
use nativelink_util::retry::{Retrier, RetryResult};
49
use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo};
50
use nativelink_util::{default_health_status_indicator, tls_utils};
51
use opentelemetry::context::Context;
52
use parking_lot::Mutex;
53
use prost::Message;
54
use tokio::time::sleep;
55
use tonic::{IntoRequest, Request, Response, Status, Streaming};
56
use tracing::error;
57
use uuid::Uuid;
58
59
// This store is usually a pass-through store, but can also be used as a CAS store. Using it as an
60
// AC store has one major side-effect... The has() function may not give the proper size of the
61
// underlying data. This might cause issues if embedded in certain stores.
62
#[derive(Debug, MetricsComponent)]
63
pub struct GrpcStore {
64
    #[metric(help = "Instance name for the store")]
65
    instance_name: String,
66
    store_type: nativelink_config::stores::StoreType,
67
    retrier: Retrier,
68
    connection_manager: ConnectionManager,
69
}
70
71
impl GrpcStore {
72
0
    pub async fn new(spec: &GrpcSpec) -> Result<Arc<Self>, Error> {
73
0
        Self::new_with_jitter(spec, spec.retry.make_jitter_fn()).await
74
0
    }
75
76
0
    pub async fn new_with_jitter(
77
0
        spec: &GrpcSpec,
78
0
        jitter_fn: Arc<dyn Fn(Duration) -> Duration + Send + Sync>,
79
0
    ) -> Result<Arc<Self>, Error> {
80
0
        error_if!(
81
0
            spec.endpoints.is_empty(),
  Branch (81:13): [True: 0, False: 0]
  Branch (81:13): [Folded - Ignored]
82
            "Expected at least 1 endpoint in GrpcStore"
83
        );
84
0
        let mut endpoints = Vec::with_capacity(spec.endpoints.len());
85
0
        for endpoint_config in &spec.endpoints {
86
0
            let endpoint = tls_utils::endpoint(endpoint_config)
87
0
                .map_err(|e| make_input_err!("Invalid URI for GrpcStore endpoint : {e:?}"))?;
88
0
            endpoints.push(endpoint);
89
        }
90
91
0
        Ok(Arc::new(Self {
92
0
            instance_name: spec.instance_name.clone(),
93
0
            store_type: spec.store_type,
94
0
            retrier: Retrier::new(
95
0
                Arc::new(|duration| Box::pin(sleep(duration))),
96
0
                jitter_fn.clone(),
97
0
                spec.retry.clone(),
98
            ),
99
0
            connection_manager: ConnectionManager::new(
100
0
                endpoints.into_iter(),
101
0
                spec.connections_per_endpoint,
102
0
                spec.max_concurrent_requests,
103
0
                spec.retry.clone(),
104
0
                jitter_fn,
105
            ),
106
        }))
107
0
    }
108
109
0
    async fn perform_request<F, Fut, R, I>(&self, input: I, mut request: F) -> Result<R, Error>
110
0
    where
111
0
        F: FnMut(I) -> Fut + Send + Copy,
112
0
        Fut: Future<Output = Result<R, Error>> + Send,
113
0
        R: Send,
114
0
        I: Send + Clone,
115
0
    {
116
0
        self.retrier
117
0
            .retry(unfold(input, move |input| async move {
118
0
                let input_clone = input.clone();
119
                Some((
120
0
                    request(input_clone)
121
0
                        .await
122
0
                        .map_or_else(RetryResult::Retry, RetryResult::Ok),
123
0
                    input,
124
                ))
125
0
            }))
126
0
            .await
127
0
    }
128
129
0
    pub async fn find_missing_blobs(
130
0
        &self,
131
0
        grpc_request: Request<FindMissingBlobsRequest>,
132
0
    ) -> Result<Response<FindMissingBlobsResponse>, Error> {
133
0
        error_if!(
134
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
135
            "CAS operation on AC store"
136
        );
137
138
0
        let mut request = grpc_request.into_inner();
139
0
        request.instance_name.clone_from(&self.instance_name);
140
0
        self.perform_request(request, |request| async move {
141
0
            let channel = self
142
0
                .connection_manager
143
0
                .connection()
144
0
                .await
145
0
                .err_tip(|| "in find_missing_blobs")?;
146
0
            ContentAddressableStorageClient::new(channel)
147
0
                .find_missing_blobs(Request::new(request))
148
0
                .await
149
0
                .err_tip(|| "in GrpcStore::find_missing_blobs")
150
0
        })
151
0
        .await
152
0
    }
153
154
0
    pub async fn batch_update_blobs(
155
0
        &self,
156
0
        grpc_request: Request<BatchUpdateBlobsRequest>,
157
0
    ) -> Result<Response<BatchUpdateBlobsResponse>, Error> {
158
0
        error_if!(
159
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
160
            "CAS operation on AC store"
161
        );
162
163
0
        let mut request = grpc_request.into_inner();
164
0
        request.instance_name.clone_from(&self.instance_name);
165
0
        self.perform_request(request, |request| async move {
166
0
            let channel = self
167
0
                .connection_manager
168
0
                .connection()
169
0
                .await
170
0
                .err_tip(|| "in batch_update_blobs")?;
171
0
            ContentAddressableStorageClient::new(channel)
172
0
                .batch_update_blobs(Request::new(request))
173
0
                .await
174
0
                .err_tip(|| "in GrpcStore::batch_update_blobs")
175
0
        })
176
0
        .await
177
0
    }
178
179
0
    pub async fn batch_read_blobs(
180
0
        &self,
181
0
        grpc_request: Request<BatchReadBlobsRequest>,
182
0
    ) -> Result<Response<BatchReadBlobsResponse>, Error> {
183
0
        error_if!(
184
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
185
            "CAS operation on AC store"
186
        );
187
188
0
        let mut request = grpc_request.into_inner();
189
0
        request.instance_name.clone_from(&self.instance_name);
190
0
        self.perform_request(request, |request| async move {
191
0
            let channel = self
192
0
                .connection_manager
193
0
                .connection()
194
0
                .await
195
0
                .err_tip(|| "in batch_read_blobs")?;
196
0
            ContentAddressableStorageClient::new(channel)
197
0
                .batch_read_blobs(Request::new(request))
198
0
                .await
199
0
                .err_tip(|| "in GrpcStore::batch_read_blobs")
200
0
        })
201
0
        .await
202
0
    }
203
204
0
    pub async fn get_tree(
205
0
        &self,
206
0
        grpc_request: Request<GetTreeRequest>,
207
0
    ) -> Result<Response<Streaming<GetTreeResponse>>, Error> {
208
0
        error_if!(
209
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
210
            "CAS operation on AC store"
211
        );
212
213
0
        let mut request = grpc_request.into_inner();
214
0
        request.instance_name.clone_from(&self.instance_name);
215
0
        self.perform_request(request, |request| async move {
216
0
            let channel = self
217
0
                .connection_manager
218
0
                .connection()
219
0
                .await
220
0
                .err_tip(|| "in get_tree")?;
221
0
            ContentAddressableStorageClient::new(channel)
222
0
                .get_tree(Request::new(request))
223
0
                .await
224
0
                .err_tip(|| "in GrpcStore::get_tree")
225
0
        })
226
0
        .await
227
0
    }
228
229
0
    fn get_read_request(&self, mut request: ReadRequest) -> Result<ReadRequest, Error> {
230
        const IS_UPLOAD_FALSE: bool = false;
231
0
        let mut resource_info = ResourceInfo::new(&request.resource_name, IS_UPLOAD_FALSE)?;
232
0
        if resource_info.instance_name != self.instance_name {
  Branch (232:12): [True: 0, False: 0]
  Branch (232:12): [Folded - Ignored]
233
0
            resource_info.instance_name = Cow::Borrowed(&self.instance_name);
234
0
            request.resource_name = resource_info.to_string(IS_UPLOAD_FALSE);
235
0
        }
236
0
        Ok(request)
237
0
    }
238
239
0
    async fn read_internal(
240
0
        &self,
241
0
        request: ReadRequest,
242
0
    ) -> Result<impl Stream<Item = Result<ReadResponse, Status>> + use<>, Error> {
243
0
        let channel = self
244
0
            .connection_manager
245
0
            .connection()
246
0
            .await
247
0
            .err_tip(|| "in read_internal")?;
248
0
        let mut response = ByteStreamClient::new(channel)
249
0
            .read(Request::new(request))
250
0
            .await
251
0
            .err_tip(|| "in GrpcStore::read")?
252
0
            .into_inner();
253
0
        let first_response = response
254
0
            .message()
255
0
            .await
256
0
            .err_tip(|| "Fetching first chunk in GrpcStore::read()")?;
257
0
        Ok(FirstStream::new(first_response, response))
258
0
    }
259
260
0
    pub async fn read<R>(
261
0
        &self,
262
0
        grpc_request: R,
263
0
    ) -> Result<impl Stream<Item = Result<ReadResponse, Status>> + use<R>, Error>
264
0
    where
265
0
        R: IntoRequest<ReadRequest>,
266
0
    {
267
0
        error_if!(
268
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
269
            "CAS operation on AC store"
270
        );
271
272
0
        let request = self.get_read_request(grpc_request.into_request().into_inner())?;
273
0
        self.perform_request(request, |request| async move {
274
0
            self.read_internal(request).await
275
0
        })
276
0
        .await
277
0
    }
278
279
0
    pub async fn write<T, E>(
280
0
        &self,
281
0
        stream: WriteRequestStreamWrapper<T>,
282
0
    ) -> Result<Response<WriteResponse>, Error>
283
0
    where
284
0
        T: Stream<Item = Result<WriteRequest, E>> + Unpin + Send + 'static,
285
0
        E: Into<Error> + 'static,
286
0
    {
287
0
        error_if!(
288
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
289
            "CAS operation on AC store"
290
        );
291
292
0
        let local_state = Arc::new(Mutex::new(WriteState::new(
293
0
            self.instance_name.clone(),
294
0
            stream,
295
        )));
296
297
0
        let result = self
298
0
            .retrier
299
0
            .retry(unfold(local_state, move |local_state| async move {
300
                // The client write may occur on a separate thread and
301
                // therefore in order to share the state with it we have to
302
                // wrap it in a Mutex and retrieve it after the write
303
                // has completed.  There is no way to get the value back
304
                // from the client.
305
0
                let result = self
306
0
                    .connection_manager
307
0
                    .connection()
308
0
                    .and_then(|channel| async {
309
0
                        ByteStreamClient::new(channel)
310
0
                            .write(WriteStateWrapper::new(local_state.clone()))
311
0
                            .await
312
0
                            .err_tip(|| "in GrpcStore::write")
313
0
                    })
314
0
                    .await;
315
316
                // Get the state back from StateWrapper, this should be
317
                // uncontended since write has returned.
318
0
                let mut local_state_locked = local_state.lock();
319
320
0
                let result = local_state_locked
321
0
                    .take_read_stream_error()
322
0
                    .map(|err| RetryResult::Err(err.append("Where read_stream_error was set")))
323
0
                    .unwrap_or_else(|| {
324
                        // No stream error, handle the original result
325
0
                        match result {
326
0
                            Ok(response) => RetryResult::Ok(response),
327
0
                            Err(err) => {
328
0
                                if local_state_locked.can_resume() {
  Branch (328:36): [True: 0, False: 0]
  Branch (328:36): [True: 0, False: 0]
  Branch (328:36): [Folded - Ignored]
329
0
                                    local_state_locked.resume();
330
0
                                    RetryResult::Retry(err)
331
                                } else {
332
0
                                    RetryResult::Err(err.append("Retry is not possible"))
333
                                }
334
                            }
335
                        }
336
0
                    });
337
338
0
                drop(local_state_locked);
339
0
                Some((result, local_state))
340
0
            }))
341
0
            .await?;
342
0
        Ok(result)
343
0
    }
344
345
0
    pub async fn query_write_status(
346
0
        &self,
347
0
        grpc_request: Request<QueryWriteStatusRequest>,
348
0
    ) -> Result<Response<QueryWriteStatusResponse>, Error> {
349
        const IS_UPLOAD_TRUE: bool = true;
350
351
0
        error_if!(
352
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
353
            "CAS operation on AC store"
354
        );
355
356
0
        let mut request = grpc_request.into_inner();
357
358
0
        let mut request_info = ResourceInfo::new(&request.resource_name, IS_UPLOAD_TRUE)?;
359
0
        if request_info.instance_name != self.instance_name {
  Branch (359:12): [True: 0, False: 0]
  Branch (359:12): [Folded - Ignored]
  Branch (359:12): [Folded - Ignored]
360
0
            request_info.instance_name = Cow::Borrowed(&self.instance_name);
361
0
            request.resource_name = request_info.to_string(IS_UPLOAD_TRUE);
362
0
        }
363
364
0
        self.perform_request(request, |request| async move {
365
0
            let channel = self
366
0
                .connection_manager
367
0
                .connection()
368
0
                .await
369
0
                .err_tip(|| "in query_write_status")?;
370
0
            ByteStreamClient::new(channel)
371
0
                .query_write_status(Request::new(request))
372
0
                .await
373
0
                .err_tip(|| "in GrpcStore::query_write_status")
374
0
        })
375
0
        .await
376
0
    }
377
378
0
    pub async fn get_action_result(
379
0
        &self,
380
0
        grpc_request: Request<GetActionResultRequest>,
381
0
    ) -> Result<Response<ActionResult>, Error> {
382
0
        let mut request = grpc_request.into_inner();
383
0
        request.instance_name.clone_from(&self.instance_name);
384
0
        self.perform_request(request, |request| async move {
385
0
            let channel = self
386
0
                .connection_manager
387
0
                .connection()
388
0
                .await
389
0
                .err_tip(|| "in get_action_result")?;
390
0
            ActionCacheClient::new(channel)
391
0
                .get_action_result(Request::new(request))
392
0
                .await
393
0
                .err_tip(|| "in GrpcStore::get_action_result")
394
0
        })
395
0
        .await
396
0
    }
397
398
0
    pub async fn update_action_result(
399
0
        &self,
400
0
        grpc_request: Request<UpdateActionResultRequest>,
401
0
    ) -> Result<Response<ActionResult>, Error> {
402
0
        let mut request = grpc_request.into_inner();
403
0
        request.instance_name.clone_from(&self.instance_name);
404
0
        self.perform_request(request, |request| async move {
405
0
            let channel = self
406
0
                .connection_manager
407
0
                .connection()
408
0
                .await
409
0
                .err_tip(|| "in update_action_result")?;
410
0
            ActionCacheClient::new(channel)
411
0
                .update_action_result(Request::new(request))
412
0
                .await
413
0
                .err_tip(|| "in GrpcStore::update_action_result")
414
0
        })
415
0
        .await
416
0
    }
417
418
0
    async fn get_action_result_from_digest(
419
0
        &self,
420
0
        digest: DigestInfo,
421
0
    ) -> Result<Response<ActionResult>, Error> {
422
0
        let action_result_request = GetActionResultRequest {
423
0
            instance_name: self.instance_name.clone(),
424
0
            action_digest: Some(digest.into()),
425
            inline_stdout: false,
426
            inline_stderr: false,
427
0
            inline_output_files: Vec::new(),
428
0
            digest_function: Context::current()
429
0
                .get::<DigestHasherFunc>()
430
0
                .map_or_else(default_digest_hasher_func, |v| *v)
431
0
                .proto_digest_func()
432
0
                .into(),
433
        };
434
0
        self.get_action_result(Request::new(action_result_request))
435
0
            .await
436
0
    }
437
438
0
    async fn get_action_result_as_part(
439
0
        &self,
440
0
        digest: DigestInfo,
441
0
        writer: &mut DropCloserWriteHalf,
442
0
        offset: usize,
443
0
        length: Option<usize>,
444
0
    ) -> Result<(), Error> {
445
0
        let action_result = self
446
0
            .get_action_result_from_digest(digest)
447
0
            .await
448
0
            .map(Response::into_inner)
449
0
            .err_tip(|| "Action result not found")?;
450
        // TODO: Would be better to avoid all the encoding and decoding in this
451
        //       file, however there's no way to currently get raw bytes from a
452
        //       generated prost request unfortunately.
453
0
        let mut value = BytesMut::new();
454
0
        action_result
455
0
            .encode(&mut value)
456
0
            .err_tip(|| "Could not encode upstream action result")?;
457
458
0
        let default_len = value.len() - offset;
459
0
        let length = length.unwrap_or(default_len).min(default_len);
460
0
        if length > 0 {
  Branch (460:12): [True: 0, False: 0]
  Branch (460:12): [Folded - Ignored]
461
0
            writer
462
0
                .send(value.freeze().slice(offset..offset + length))
463
0
                .await
464
0
                .err_tip(|| "Failed to write data in grpc store")?;
465
0
        }
466
0
        writer
467
0
            .send_eof()
468
0
            .err_tip(|| "Failed to write EOF in grpc store get_action_result_as_part")?;
469
0
        Ok(())
470
0
    }
471
472
0
    async fn update_action_result_from_bytes(
473
0
        &self,
474
0
        digest: DigestInfo,
475
0
        mut reader: DropCloserReadHalf,
476
0
    ) -> Result<(), Error> {
477
0
        let action_result = ActionResult::decode(reader.consume(None).await?)
478
0
            .err_tip(|| "Failed to decode ActionResult in update_action_result_from_bytes")?;
479
0
        let update_action_request = UpdateActionResultRequest {
480
0
            instance_name: self.instance_name.clone(),
481
0
            action_digest: Some(digest.into()),
482
0
            action_result: Some(action_result),
483
0
            results_cache_policy: None,
484
0
            digest_function: Context::current()
485
0
                .get::<DigestHasherFunc>()
486
0
                .map_or_else(default_digest_hasher_func, |v| *v)
487
0
                .proto_digest_func()
488
0
                .into(),
489
        };
490
0
        self.update_action_result(Request::new(update_action_request))
491
0
            .await
492
0
            .map(|_| ())
493
0
    }
494
}
495
496
#[async_trait]
497
impl StoreDriver for GrpcStore {
498
    // NOTE: This function can only be safely used on CAS stores. AC stores may return a size that
499
    // is incorrect.
500
    async fn has_with_results(
501
        self: Pin<&Self>,
502
        keys: &[StoreKey<'_>],
503
        results: &mut [Option<u64>],
504
0
    ) -> Result<(), Error> {
505
0
        if matches!(self.store_type, nativelink_config::stores::StoreType::Ac) {
506
0
            keys.iter()
507
0
                .zip(results.iter_mut())
508
0
                .map(|(key, result)| async move {
509
                    // The length of an AC is incorrect, so we don't figure out the
510
                    // length, instead the biggest possible result is returned in the
511
                    // hope that we detect incorrect usage.
512
0
                    self.get_action_result_from_digest(key.borrow().into_digest())
513
0
                        .await?;
514
0
                    *result = Some(u64::MAX);
515
0
                    Ok::<_, Error>(())
516
0
                })
517
0
                .collect::<FuturesUnordered<_>>()
518
0
                .try_for_each(|()| future::ready(Ok(())))
519
0
                .await
520
0
                .err_tip(|| "Getting upstream action cache entry")?;
521
0
            return Ok(());
522
0
        }
523
524
0
        let missing_blobs_response = self
525
0
            .find_missing_blobs(Request::new(FindMissingBlobsRequest {
526
0
                instance_name: self.instance_name.clone(),
527
0
                blob_digests: keys
528
0
                    .iter()
529
0
                    .map(|k| k.borrow().into_digest().into())
530
0
                    .collect(),
531
0
                digest_function: Context::current()
532
0
                    .get::<DigestHasherFunc>()
533
0
                    .map_or_else(default_digest_hasher_func, |v| *v)
534
0
                    .proto_digest_func()
535
0
                    .into(),
536
            }))
537
0
            .await?
538
0
            .into_inner();
539
540
        // Since the ordering is not guaranteed above, the matching has to check
541
        // all missing blobs against all entries in the unsorted digest list.
542
        // To optimise this, the missing digests are sorted and then it is
543
        // efficient to perform a binary search for each digest within the
544
        // missing list.
545
0
        let mut missing_digests =
546
0
            Vec::with_capacity(missing_blobs_response.missing_blob_digests.len());
547
0
        for missing_digest in missing_blobs_response.missing_blob_digests {
548
0
            missing_digests.push(DigestInfo::try_from(missing_digest)?);
549
        }
550
0
        missing_digests.sort_unstable();
551
0
        for (digest, result) in keys
552
0
            .iter()
553
0
            .map(|v| v.borrow().into_digest())
554
0
            .zip(results.iter_mut())
555
        {
556
0
            match missing_digests.binary_search(&digest) {
557
0
                Ok(_) => *result = None,
558
0
                Err(_) => *result = Some(digest.size_bytes()),
559
            }
560
        }
561
562
0
        Ok(())
563
0
    }
564
565
    async fn update(
566
        self: Pin<&Self>,
567
        key: StoreKey<'_>,
568
        reader: DropCloserReadHalf,
569
        _size_info: UploadSizeInfo,
570
0
    ) -> Result<(), Error> {
571
0
        let digest = key.into_digest();
572
0
        if matches!(self.store_type, nativelink_config::stores::StoreType::Ac) {
573
0
            return self.update_action_result_from_bytes(digest, reader).await;
574
0
        }
575
576
0
        let digest_function = Context::current()
577
0
            .get::<DigestHasherFunc>()
578
0
            .map_or_else(default_digest_hasher_func, |v| *v)
579
0
            .proto_digest_func()
580
0
            .as_str_name()
581
0
            .to_ascii_lowercase();
582
0
        let mut buf = Uuid::encode_buffer();
583
0
        let resource_name = format!(
584
0
            "{}/uploads/{}/blobs/{}/{}/{}",
585
0
            &self.instance_name,
586
0
            Uuid::new_v4().hyphenated().encode_lower(&mut buf),
587
            digest_function,
588
0
            digest.packed_hash(),
589
0
            digest.size_bytes(),
590
        );
591
592
        struct LocalState {
593
            resource_name: String,
594
            reader: DropCloserReadHalf,
595
            did_error: bool,
596
            bytes_received: i64,
597
        }
598
0
        let local_state = LocalState {
599
0
            resource_name,
600
0
            reader,
601
0
            did_error: false,
602
0
            bytes_received: 0,
603
0
        };
604
605
0
        let stream = Box::pin(unfold(local_state, |mut local_state| async move {
606
0
            if local_state.did_error {
  Branch (606:16): [True: 0, False: 0]
  Branch (606:16): [Folded - Ignored]
607
0
                error!("GrpcStore::update() polled stream after error was returned");
608
0
                return None;
609
0
            }
610
0
            let data = match local_state
611
0
                .reader
612
0
                .recv()
613
0
                .await
614
0
                .err_tip(|| "In GrpcStore::update()")
615
            {
616
0
                Ok(data) => data,
617
0
                Err(err) => {
618
0
                    local_state.did_error = true;
619
0
                    return Some((Err(err), local_state));
620
                }
621
            };
622
623
0
            let write_offset = local_state.bytes_received;
624
0
            local_state.bytes_received += data.len() as i64;
625
626
0
            Some((
627
0
                Ok(WriteRequest {
628
0
                    resource_name: local_state.resource_name.clone(),
629
0
                    write_offset,
630
0
                    finish_write: data.is_empty(), // EOF is when no data was polled.
631
0
                    data,
632
0
                }),
633
0
                local_state,
634
0
            ))
635
0
        }));
636
637
0
        self.write(
638
0
            WriteRequestStreamWrapper::from(stream)
639
0
                .await
640
0
                .err_tip(|| "in GrpcStore::update()")?,
641
        )
642
0
        .await
643
0
        .err_tip(|| "in GrpcStore::update()")?;
644
645
0
        Ok(())
646
0
    }
647
648
    async fn get_part(
649
        self: Pin<&Self>,
650
        key: StoreKey<'_>,
651
        writer: &mut DropCloserWriteHalf,
652
        offset: u64,
653
        length: Option<u64>,
654
0
    ) -> Result<(), Error> {
655
0
        let digest = key.into_digest();
656
0
        if matches!(self.store_type, nativelink_config::stores::StoreType::Ac) {
657
0
            let offset = usize::try_from(offset).err_tip(|| "Could not convert offset to usize")?;
658
0
            let length = length
659
0
                .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize"))
660
0
                .transpose()?;
661
662
0
            return self
663
0
                .get_action_result_as_part(digest, writer, offset, length)
664
0
                .await;
665
0
        }
666
667
        // Shortcut for empty blobs.
668
0
        if digest.size_bytes() == 0 {
  Branch (668:12): [True: 0, False: 0]
  Branch (668:12): [Folded - Ignored]
669
0
            return writer.send_eof();
670
0
        }
671
672
0
        let digest_function = Context::current()
673
0
            .get::<DigestHasherFunc>()
674
0
            .map_or_else(default_digest_hasher_func, |v| *v)
675
0
            .proto_digest_func()
676
0
            .as_str_name()
677
0
            .to_ascii_lowercase();
678
0
        let resource_name = format!(
679
0
            "{}/blobs/{}/{}/{}",
680
0
            &self.instance_name,
681
            digest_function,
682
0
            digest.packed_hash(),
683
0
            digest.size_bytes(),
684
        );
685
686
        struct LocalState<'a> {
687
            resource_name: String,
688
            writer: &'a mut DropCloserWriteHalf,
689
            read_offset: i64,
690
            read_limit: i64,
691
        }
692
693
0
        let local_state = LocalState {
694
0
            resource_name,
695
0
            writer,
696
0
            read_offset: i64::try_from(offset).err_tip(|| "Could not convert offset to i64")?,
697
0
            read_limit: i64::try_from(length.unwrap_or(0))
698
0
                .err_tip(|| "Could not convert length to i64")?,
699
        };
700
701
0
        self.retrier
702
0
            .retry(unfold(local_state, move |mut local_state| async move {
703
0
                let request = ReadRequest {
704
0
                    resource_name: local_state.resource_name.clone(),
705
0
                    read_offset: local_state.read_offset,
706
0
                    read_limit: local_state.read_limit,
707
0
                };
708
0
                let mut stream = match self
709
0
                    .read_internal(request)
710
0
                    .await
711
0
                    .err_tip(|| "in GrpcStore::get_part()")
712
                {
713
0
                    Ok(stream) => stream,
714
0
                    Err(err) => return Some((RetryResult::Retry(err), local_state)),
715
                };
716
717
                loop {
718
0
                    let data = match stream.next().await {
719
                        // Create an empty response to represent EOF.
720
0
                        None => bytes::Bytes::new(),
721
0
                        Some(Ok(message)) => message.data,
722
0
                        Some(Err(status)) => {
723
0
                            return Some((
724
0
                                RetryResult::Retry(
725
0
                                    Into::<Error>::into(status)
726
0
                                        .append("While fetching message in GrpcStore::get_part()"),
727
0
                                ),
728
0
                                local_state,
729
0
                            ));
730
                        }
731
                    };
732
0
                    let length = data.len() as i64;
733
                    // This is the usual exit from the loop at EOF.
734
0
                    if length == 0 {
  Branch (734:24): [True: 0, False: 0]
  Branch (734:24): [Folded - Ignored]
735
0
                        let eof_result = local_state
736
0
                            .writer
737
0
                            .send_eof()
738
0
                            .err_tip(|| "Could not send eof in GrpcStore::get_part()")
739
0
                            .map_or_else(RetryResult::Err, RetryResult::Ok);
740
0
                        return Some((eof_result, local_state));
741
0
                    }
742
                    // Forward the data upstream.
743
0
                    if let Err(err) = local_state
  Branch (743:28): [True: 0, False: 0]
  Branch (743:28): [Folded - Ignored]
744
0
                        .writer
745
0
                        .send(data)
746
0
                        .await
747
0
                        .err_tip(|| "While sending in GrpcStore::get_part()")
748
                    {
749
0
                        return Some((RetryResult::Err(err), local_state));
750
0
                    }
751
0
                    local_state.read_offset += length;
752
                }
753
0
            }))
754
0
            .await
755
0
    }
756
757
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
758
0
        self
759
0
    }
760
761
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
762
0
        self
763
0
    }
764
765
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
766
0
        self
767
0
    }
768
}
769
770
default_health_status_indicator!(GrpcStore);