Coverage Report

Created: 2025-04-19 16:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/grpc_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::pin::Pin;
17
use std::sync::Arc;
18
use std::time::Duration;
19
20
use async_trait::async_trait;
21
use bytes::BytesMut;
22
use futures::stream::{FuturesUnordered, unfold};
23
use futures::{Future, Stream, StreamExt, TryFutureExt, TryStreamExt, future};
24
use nativelink_config::stores::GrpcSpec;
25
use nativelink_error::{Error, ResultExt, error_if, make_input_err};
26
use nativelink_metric::MetricsComponent;
27
use nativelink_proto::build::bazel::remote::execution::v2::action_cache_client::ActionCacheClient;
28
use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_client::ContentAddressableStorageClient;
29
use nativelink_proto::build::bazel::remote::execution::v2::{
30
    ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest,
31
    BatchUpdateBlobsResponse, FindMissingBlobsRequest, FindMissingBlobsResponse,
32
    GetActionResultRequest, GetTreeRequest, GetTreeResponse, UpdateActionResultRequest,
33
};
34
use nativelink_proto::google::bytestream::byte_stream_client::ByteStreamClient;
35
use nativelink_proto::google::bytestream::{
36
    QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest,
37
    WriteResponse,
38
};
39
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
40
use nativelink_util::common::DigestInfo;
41
use nativelink_util::connection_manager::ConnectionManager;
42
use nativelink_util::digest_hasher::{ACTIVE_HASHER_FUNC, default_digest_hasher_func};
43
use nativelink_util::health_utils::HealthStatusIndicator;
44
use nativelink_util::origin_context::ActiveOriginContext;
45
use nativelink_util::proto_stream_utils::{
46
    FirstStream, WriteRequestStreamWrapper, WriteState, WriteStateWrapper,
47
};
48
use nativelink_util::resource_info::ResourceInfo;
49
use nativelink_util::retry::{Retrier, RetryResult};
50
use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo};
51
use nativelink_util::{default_health_status_indicator, tls_utils};
52
use parking_lot::Mutex;
53
use prost::Message;
54
use rand::Rng;
55
use tokio::time::sleep;
56
use tonic::{IntoRequest, Request, Response, Status, Streaming};
57
use tracing::{Level, event};
58
use uuid::Uuid;
59
60
// This store is usually a pass-through store, but can also be used as a CAS store. Using it as an
61
// AC store has one major side-effect... The has() function may not give the proper size of the
62
// underlying data. This might cause issues if embedded in certain stores.
63
#[derive(Debug, MetricsComponent)]
64
pub struct GrpcStore {
65
    #[metric(help = "Instance name for the store")]
66
    instance_name: String,
67
    store_type: nativelink_config::stores::StoreType,
68
    retrier: Retrier,
69
    connection_manager: ConnectionManager,
70
}
71
72
impl GrpcStore {
73
0
    pub async fn new(spec: &GrpcSpec) -> Result<Arc<Self>, Error> {
74
0
        let jitter_amt = spec.retry.jitter;
75
0
        Self::new_with_jitter(
76
0
            spec,
77
0
            Box::new(move |delay: Duration| {
78
0
                if jitter_amt == 0. {
  Branch (78:20): [True: 0, False: 0]
  Branch (78:20): [Folded - Ignored]
79
0
                    return delay;
80
0
                }
81
0
                let min = 1. - (jitter_amt / 2.);
82
0
                let max = 1. + (jitter_amt / 2.);
83
0
                delay.mul_f32(rand::rng().random_range(min..max))
84
0
            }),
85
        )
86
0
        .await
87
0
    }
88
89
0
    pub async fn new_with_jitter(
90
0
        spec: &GrpcSpec,
91
0
        jitter_fn: Box<dyn Fn(Duration) -> Duration + Send + Sync>,
92
0
    ) -> Result<Arc<Self>, Error> {
93
0
        error_if!(
94
0
            spec.endpoints.is_empty(),
  Branch (94:13): [True: 0, False: 0]
  Branch (94:13): [Folded - Ignored]
95
            "Expected at least 1 endpoint in GrpcStore"
96
        );
97
0
        let mut endpoints = Vec::with_capacity(spec.endpoints.len());
98
0
        for endpoint_config in &spec.endpoints {
99
0
            let endpoint = tls_utils::endpoint(endpoint_config)
100
0
                .map_err(|e| make_input_err!("Invalid URI for GrpcStore endpoint : {e:?}"))?;
101
0
            endpoints.push(endpoint);
102
        }
103
104
0
        let jitter_fn = Arc::new(jitter_fn);
105
0
        Ok(Arc::new(GrpcStore {
106
0
            instance_name: spec.instance_name.clone(),
107
0
            store_type: spec.store_type,
108
0
            retrier: Retrier::new(
109
0
                Arc::new(|duration| Box::pin(sleep(duration))),
110
0
                jitter_fn.clone(),
111
0
                spec.retry.clone(),
112
0
            ),
113
0
            connection_manager: ConnectionManager::new(
114
0
                endpoints.into_iter(),
115
0
                spec.connections_per_endpoint,
116
0
                spec.max_concurrent_requests,
117
0
                spec.retry.clone(),
118
0
                jitter_fn,
119
            ),
120
        }))
121
0
    }
122
123
0
    async fn perform_request<F, Fut, R, I>(&self, input: I, mut request: F) -> Result<R, Error>
124
0
    where
125
0
        F: FnMut(I) -> Fut + Send + Copy,
126
0
        Fut: Future<Output = Result<R, Error>> + Send,
127
0
        R: Send,
128
0
        I: Send + Clone,
129
0
    {
130
0
        self.retrier
131
0
            .retry(unfold(input, move |input| async move {
132
0
                let input_clone = input.clone();
133
0
                Some((
134
0
                    request(input_clone)
135
0
                        .await
136
0
                        .map_or_else(RetryResult::Retry, RetryResult::Ok),
137
0
                    input,
138
0
                ))
139
0
            }))
140
0
            .await
141
0
    }
142
143
0
    pub async fn find_missing_blobs(
144
0
        &self,
145
0
        grpc_request: Request<FindMissingBlobsRequest>,
146
0
    ) -> Result<Response<FindMissingBlobsResponse>, Error> {
147
0
        error_if!(
148
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
149
            "CAS operation on AC store"
150
        );
151
152
0
        let mut request = grpc_request.into_inner();
153
0
        request.instance_name.clone_from(&self.instance_name);
154
0
        self.perform_request(request, |request| async move {
155
0
            let channel = self
156
0
                .connection_manager
157
0
                .connection()
158
0
                .await
159
0
                .err_tip(|| "in find_missing_blobs")?;
160
0
            ContentAddressableStorageClient::new(channel)
161
0
                .find_missing_blobs(Request::new(request))
162
0
                .await
163
0
                .err_tip(|| "in GrpcStore::find_missing_blobs")
164
0
        })
165
0
        .await
166
0
    }
167
168
0
    pub async fn batch_update_blobs(
169
0
        &self,
170
0
        grpc_request: Request<BatchUpdateBlobsRequest>,
171
0
    ) -> Result<Response<BatchUpdateBlobsResponse>, Error> {
172
0
        error_if!(
173
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
174
            "CAS operation on AC store"
175
        );
176
177
0
        let mut request = grpc_request.into_inner();
178
0
        request.instance_name.clone_from(&self.instance_name);
179
0
        self.perform_request(request, |request| async move {
180
0
            let channel = self
181
0
                .connection_manager
182
0
                .connection()
183
0
                .await
184
0
                .err_tip(|| "in batch_update_blobs")?;
185
0
            ContentAddressableStorageClient::new(channel)
186
0
                .batch_update_blobs(Request::new(request))
187
0
                .await
188
0
                .err_tip(|| "in GrpcStore::batch_update_blobs")
189
0
        })
190
0
        .await
191
0
    }
192
193
0
    pub async fn batch_read_blobs(
194
0
        &self,
195
0
        grpc_request: Request<BatchReadBlobsRequest>,
196
0
    ) -> Result<Response<BatchReadBlobsResponse>, Error> {
197
0
        error_if!(
198
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
199
            "CAS operation on AC store"
200
        );
201
202
0
        let mut request = grpc_request.into_inner();
203
0
        request.instance_name.clone_from(&self.instance_name);
204
0
        self.perform_request(request, |request| async move {
205
0
            let channel = self
206
0
                .connection_manager
207
0
                .connection()
208
0
                .await
209
0
                .err_tip(|| "in batch_read_blobs")?;
210
0
            ContentAddressableStorageClient::new(channel)
211
0
                .batch_read_blobs(Request::new(request))
212
0
                .await
213
0
                .err_tip(|| "in GrpcStore::batch_read_blobs")
214
0
        })
215
0
        .await
216
0
    }
217
218
0
    pub async fn get_tree(
219
0
        &self,
220
0
        grpc_request: Request<GetTreeRequest>,
221
0
    ) -> Result<Response<Streaming<GetTreeResponse>>, Error> {
222
0
        error_if!(
223
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
224
            "CAS operation on AC store"
225
        );
226
227
0
        let mut request = grpc_request.into_inner();
228
0
        request.instance_name.clone_from(&self.instance_name);
229
0
        self.perform_request(request, |request| async move {
230
0
            let channel = self
231
0
                .connection_manager
232
0
                .connection()
233
0
                .await
234
0
                .err_tip(|| "in get_tree")?;
235
0
            ContentAddressableStorageClient::new(channel)
236
0
                .get_tree(Request::new(request))
237
0
                .await
238
0
                .err_tip(|| "in GrpcStore::get_tree")
239
0
        })
240
0
        .await
241
0
    }
242
243
0
    fn get_read_request(&self, mut request: ReadRequest) -> Result<ReadRequest, Error> {
244
        const IS_UPLOAD_FALSE: bool = false;
245
0
        let mut resource_info = ResourceInfo::new(&request.resource_name, IS_UPLOAD_FALSE)?;
246
0
        if resource_info.instance_name != self.instance_name {
  Branch (246:12): [True: 0, False: 0]
  Branch (246:12): [Folded - Ignored]
247
0
            resource_info.instance_name = Cow::Borrowed(&self.instance_name);
248
0
            request.resource_name = resource_info.to_string(IS_UPLOAD_FALSE);
249
0
        }
250
0
        Ok(request)
251
0
    }
252
253
0
    async fn read_internal(
254
0
        &self,
255
0
        request: ReadRequest,
256
0
    ) -> Result<impl Stream<Item = Result<ReadResponse, Status>> + use<>, Error> {
257
0
        let channel = self
258
0
            .connection_manager
259
0
            .connection()
260
0
            .await
261
0
            .err_tip(|| "in read_internal")?;
262
0
        let mut response = ByteStreamClient::new(channel)
263
0
            .read(Request::new(request))
264
0
            .await
265
0
            .err_tip(|| "in GrpcStore::read")?
266
0
            .into_inner();
267
0
        let first_response = response
268
0
            .message()
269
0
            .await
270
0
            .err_tip(|| "Fetching first chunk in GrpcStore::read()")?;
271
0
        Ok(FirstStream::new(first_response, response))
272
0
    }
273
274
0
    pub async fn read<R>(
275
0
        &self,
276
0
        grpc_request: R,
277
0
    ) -> Result<impl Stream<Item = Result<ReadResponse, Status>> + use<R>, Error>
278
0
    where
279
0
        R: IntoRequest<ReadRequest>,
280
0
    {
281
0
        error_if!(
282
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
283
            "CAS operation on AC store"
284
        );
285
286
0
        let request = self.get_read_request(grpc_request.into_request().into_inner())?;
287
0
        self.perform_request(request, |request| async move {
288
0
            self.read_internal(request).await
289
0
        })
290
0
        .await
291
0
    }
292
293
0
    pub async fn write<T, E>(
294
0
        &self,
295
0
        stream: WriteRequestStreamWrapper<T>,
296
0
    ) -> Result<Response<WriteResponse>, Error>
297
0
    where
298
0
        T: Stream<Item = Result<WriteRequest, E>> + Unpin + Send + 'static,
299
0
        E: Into<Error> + 'static,
300
0
    {
301
0
        error_if!(
302
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
303
            "CAS operation on AC store"
304
        );
305
306
0
        let local_state = Arc::new(Mutex::new(WriteState::new(
307
0
            self.instance_name.clone(),
308
0
            stream,
309
        )));
310
311
0
        let result = self
312
0
            .retrier
313
0
            .retry(unfold(local_state, move |local_state| async move {
314
                // The client write may occur on a separate thread and
315
                // therefore in order to share the state with it we have to
316
                // wrap it in a Mutex and retrieve it after the write
317
                // has completed.  There is no way to get the value back
318
                // from the client.
319
0
                let result = self
320
0
                    .connection_manager
321
0
                    .connection()
322
0
                    .and_then(|channel| async {
323
0
                        ByteStreamClient::new(channel)
324
0
                            .write(WriteStateWrapper::new(local_state.clone()))
325
0
                            .await
326
0
                            .err_tip(|| "in GrpcStore::write")
327
0
                    })
328
0
                    .await;
329
330
                // Get the state back from StateWrapper, this should be
331
                // uncontended since write has returned.
332
0
                let mut local_state_locked = local_state.lock();
333
334
0
                let result = if let Some(err) = local_state_locked.take_read_stream_error() {
  Branch (334:37): [True: 0, False: 0]
  Branch (334:37): [True: 0, False: 0]
  Branch (334:37): [Folded - Ignored]
335
                    // If there was an error with the stream, then don't retry.
336
0
                    RetryResult::Err(err.append("Where read_stream_error was set"))
337
                } else {
338
                    // On error determine whether it is possible to retry.
339
0
                    match result {
340
0
                        Err(err) => {
341
0
                            if local_state_locked.can_resume() {
  Branch (341:32): [True: 0, False: 0]
  Branch (341:32): [True: 0, False: 0]
  Branch (341:32): [Folded - Ignored]
342
0
                                local_state_locked.resume();
343
0
                                RetryResult::Retry(err)
344
                            } else {
345
0
                                RetryResult::Err(err.append("Retry is not possible"))
346
                            }
347
                        }
348
0
                        Ok(response) => RetryResult::Ok(response),
349
                    }
350
                };
351
352
0
                drop(local_state_locked);
353
0
                Some((result, local_state))
354
0
            }))
355
0
            .await?;
356
0
        Ok(result)
357
0
    }
358
359
0
    pub async fn query_write_status(
360
0
        &self,
361
0
        grpc_request: Request<QueryWriteStatusRequest>,
362
0
    ) -> Result<Response<QueryWriteStatusResponse>, Error> {
363
        const IS_UPLOAD_TRUE: bool = true;
364
365
0
        error_if!(
366
0
            matches!(self.store_type, nativelink_config::stores::StoreType::Ac),
367
            "CAS operation on AC store"
368
        );
369
370
0
        let mut request = grpc_request.into_inner();
371
372
0
        let mut request_info = ResourceInfo::new(&request.resource_name, IS_UPLOAD_TRUE)?;
373
0
        if request_info.instance_name != self.instance_name {
  Branch (373:12): [True: 0, False: 0]
  Branch (373:12): [Folded - Ignored]
  Branch (373:12): [Folded - Ignored]
374
0
            request_info.instance_name = Cow::Borrowed(&self.instance_name);
375
0
            request.resource_name = request_info.to_string(IS_UPLOAD_TRUE);
376
0
        }
377
378
0
        self.perform_request(request, |request| async move {
379
0
            let channel = self
380
0
                .connection_manager
381
0
                .connection()
382
0
                .await
383
0
                .err_tip(|| "in query_write_status")?;
384
0
            ByteStreamClient::new(channel)
385
0
                .query_write_status(Request::new(request))
386
0
                .await
387
0
                .err_tip(|| "in GrpcStore::query_write_status")
388
0
        })
389
0
        .await
390
0
    }
391
392
0
    pub async fn get_action_result(
393
0
        &self,
394
0
        grpc_request: Request<GetActionResultRequest>,
395
0
    ) -> Result<Response<ActionResult>, Error> {
396
0
        let mut request = grpc_request.into_inner();
397
0
        request.instance_name.clone_from(&self.instance_name);
398
0
        self.perform_request(request, |request| async move {
399
0
            let channel = self
400
0
                .connection_manager
401
0
                .connection()
402
0
                .await
403
0
                .err_tip(|| "in get_action_result")?;
404
0
            ActionCacheClient::new(channel)
405
0
                .get_action_result(Request::new(request))
406
0
                .await
407
0
                .err_tip(|| "in GrpcStore::get_action_result")
408
0
        })
409
0
        .await
410
0
    }
411
412
0
    pub async fn update_action_result(
413
0
        &self,
414
0
        grpc_request: Request<UpdateActionResultRequest>,
415
0
    ) -> Result<Response<ActionResult>, Error> {
416
0
        let mut request = grpc_request.into_inner();
417
0
        request.instance_name.clone_from(&self.instance_name);
418
0
        self.perform_request(request, |request| async move {
419
0
            let channel = self
420
0
                .connection_manager
421
0
                .connection()
422
0
                .await
423
0
                .err_tip(|| "in update_action_result")?;
424
0
            ActionCacheClient::new(channel)
425
0
                .update_action_result(Request::new(request))
426
0
                .await
427
0
                .err_tip(|| "in GrpcStore::update_action_result")
428
0
        })
429
0
        .await
430
0
    }
431
432
0
    async fn get_action_result_from_digest(
433
0
        &self,
434
0
        digest: DigestInfo,
435
0
    ) -> Result<Response<ActionResult>, Error> {
436
0
        let action_result_request = GetActionResultRequest {
437
0
            instance_name: self.instance_name.clone(),
438
0
            action_digest: Some(digest.into()),
439
0
            inline_stdout: false,
440
0
            inline_stderr: false,
441
0
            inline_output_files: Vec::new(),
442
0
            digest_function: ActiveOriginContext::get_value(&ACTIVE_HASHER_FUNC)
443
0
                .err_tip(|| "In get_action_from_store")?
444
0
                .map_or_else(default_digest_hasher_func, |v| *v)
445
0
                .proto_digest_func()
446
0
                .into(),
447
0
        };
448
0
        self.get_action_result(Request::new(action_result_request))
449
0
            .await
450
0
    }
451
452
0
    async fn get_action_result_as_part(
453
0
        &self,
454
0
        digest: DigestInfo,
455
0
        writer: &mut DropCloserWriteHalf,
456
0
        offset: usize,
457
0
        length: Option<usize>,
458
0
    ) -> Result<(), Error> {
459
0
        let action_result = self
460
0
            .get_action_result_from_digest(digest)
461
0
            .await
462
0
            .map(Response::into_inner)
463
0
            .err_tip(|| "Action result not found")?;
464
        // TODO: Would be better to avoid all the encoding and decoding in this
465
        //       file, however there's no way to currently get raw bytes from a
466
        //       generated prost request unfortunately.
467
0
        let mut value = BytesMut::new();
468
0
        action_result
469
0
            .encode(&mut value)
470
0
            .err_tip(|| "Could not encode upstream action result")?;
471
472
0
        let default_len = value.len() - offset;
473
0
        let length = length.unwrap_or(default_len).min(default_len);
474
0
        if length > 0 {
  Branch (474:12): [True: 0, False: 0]
  Branch (474:12): [Folded - Ignored]
475
0
            writer
476
0
                .send(value.freeze().slice(offset..offset + length))
477
0
                .await
478
0
                .err_tip(|| "Failed to write data in grpc store")?;
479
0
        }
480
0
        writer
481
0
            .send_eof()
482
0
            .err_tip(|| "Failed to write EOF in grpc store get_action_result_as_part")?;
483
0
        Ok(())
484
0
    }
485
486
0
    async fn update_action_result_from_bytes(
487
0
        &self,
488
0
        digest: DigestInfo,
489
0
        mut reader: DropCloserReadHalf,
490
0
    ) -> Result<(), Error> {
491
0
        let action_result = ActionResult::decode(reader.consume(None).await?)
492
0
            .err_tip(|| "Failed to decode ActionResult in update_action_result_from_bytes")?;
493
0
        let update_action_request = UpdateActionResultRequest {
494
0
            instance_name: self.instance_name.clone(),
495
0
            action_digest: Some(digest.into()),
496
0
            action_result: Some(action_result),
497
0
            results_cache_policy: None,
498
0
            digest_function: ActiveOriginContext::get_value(&ACTIVE_HASHER_FUNC)
499
0
                .err_tip(|| "In get_action_from_store")?
500
0
                .map_or_else(default_digest_hasher_func, |v| *v)
501
0
                .proto_digest_func()
502
0
                .into(),
503
0
        };
504
0
        self.update_action_result(Request::new(update_action_request))
505
0
            .await
506
0
            .map(|_| ())
507
0
    }
508
}
509
510
#[async_trait]
511
impl StoreDriver for GrpcStore {
512
    // NOTE: This function can only be safely used on CAS stores. AC stores may return a size that
513
    // is incorrect.
514
    async fn has_with_results(
515
        self: Pin<&Self>,
516
        keys: &[StoreKey<'_>],
517
        results: &mut [Option<u64>],
518
0
    ) -> Result<(), Error> {
519
0
        if matches!(self.store_type, nativelink_config::stores::StoreType::Ac) {
520
0
            keys.iter()
521
0
                .zip(results.iter_mut())
522
0
                .map(|(key, result)| async move {
523
0
                    // The length of an AC is incorrect, so we don't figure out the
524
0
                    // length, instead the biggest possible result is returned in the
525
0
                    // hope that we detect incorrect usage.
526
0
                    self.get_action_result_from_digest(key.borrow().into_digest())
527
0
                        .await?;
528
0
                    *result = Some(u64::MAX);
529
0
                    Ok::<_, Error>(())
530
0
                })
531
0
                .collect::<FuturesUnordered<_>>()
532
0
                .try_for_each(|()| future::ready(Ok(())))
533
0
                .await
534
0
                .err_tip(|| "Getting upstream action cache entry")?;
535
0
            return Ok(());
536
0
        }
537
538
0
        let missing_blobs_response = self
539
0
            .find_missing_blobs(Request::new(FindMissingBlobsRequest {
540
0
                instance_name: self.instance_name.clone(),
541
0
                blob_digests: keys
542
0
                    .iter()
543
0
                    .map(|k| k.borrow().into_digest().into())
544
0
                    .collect(),
545
0
                digest_function: ActiveOriginContext::get_value(&ACTIVE_HASHER_FUNC)
546
0
                    .err_tip(|| "In GrpcStore::has_with_results")?
547
0
                    .map_or_else(default_digest_hasher_func, |v| *v)
548
0
                    .proto_digest_func()
549
0
                    .into(),
550
0
            }))
551
0
            .await?
552
0
            .into_inner();
553
0
554
0
        // Since the ordering is not guaranteed above, the matching has to check
555
0
        // all missing blobs against all entries in the unsorted digest list.
556
0
        // To optimise this, the missing digests are sorted and then it is
557
0
        // efficient to perform a binary search for each digest within the
558
0
        // missing list.
559
0
        let mut missing_digests =
560
0
            Vec::with_capacity(missing_blobs_response.missing_blob_digests.len());
561
0
        for missing_digest in missing_blobs_response.missing_blob_digests {
562
0
            missing_digests.push(DigestInfo::try_from(missing_digest)?);
563
        }
564
0
        missing_digests.sort_unstable();
565
0
        for (digest, result) in keys
566
0
            .iter()
567
0
            .map(|v| v.borrow().into_digest())
568
0
            .zip(results.iter_mut())
569
        {
570
0
            match missing_digests.binary_search(&digest) {
571
0
                Ok(_) => *result = None,
572
0
                Err(_) => *result = Some(digest.size_bytes()),
573
            }
574
        }
575
576
0
        Ok(())
577
0
    }
578
579
    async fn update(
580
        self: Pin<&Self>,
581
        key: StoreKey<'_>,
582
        reader: DropCloserReadHalf,
583
        _size_info: UploadSizeInfo,
584
0
    ) -> Result<(), Error> {
585
0
        let digest = key.into_digest();
586
0
        if matches!(self.store_type, nativelink_config::stores::StoreType::Ac) {
587
0
            return self.update_action_result_from_bytes(digest, reader).await;
588
0
        }
589
590
0
        let digest_function = ActiveOriginContext::get_value(&ACTIVE_HASHER_FUNC)
591
0
            .err_tip(|| "In GrpcStore::update()")?
592
0
            .map_or_else(default_digest_hasher_func, |v| *v)
593
0
            .proto_digest_func()
594
0
            .as_str_name()
595
0
            .to_ascii_lowercase();
596
0
        let mut buf = Uuid::encode_buffer();
597
0
        let resource_name = format!(
598
0
            "{}/uploads/{}/blobs/{}/{}/{}",
599
0
            &self.instance_name,
600
0
            Uuid::new_v4().hyphenated().encode_lower(&mut buf),
601
0
            digest_function,
602
0
            digest.packed_hash(),
603
0
            digest.size_bytes(),
604
        );
605
606
        struct LocalState {
607
            resource_name: String,
608
            reader: DropCloserReadHalf,
609
            did_error: bool,
610
            bytes_received: i64,
611
        }
612
0
        let local_state = LocalState {
613
0
            resource_name,
614
0
            reader,
615
0
            did_error: false,
616
0
            bytes_received: 0,
617
0
        };
618
0
619
0
        let stream = Box::pin(unfold(local_state, |mut local_state| async move {
620
0
            if local_state.did_error {
  Branch (620:16): [True: 0, False: 0]
  Branch (620:16): [Folded - Ignored]
621
0
                event!(
622
0
                    Level::ERROR,
623
0
                    "GrpcStore::update() polled stream after error was returned"
624
                );
625
0
                return None;
626
0
            }
627
0
            let data = match local_state
628
0
                .reader
629
0
                .recv()
630
0
                .await
631
0
                .err_tip(|| "In GrpcStore::update()")
632
            {
633
0
                Ok(data) => data,
634
0
                Err(err) => {
635
0
                    local_state.did_error = true;
636
0
                    return Some((Err(err), local_state));
637
                }
638
            };
639
640
0
            let write_offset = local_state.bytes_received;
641
0
            local_state.bytes_received += data.len() as i64;
642
0
643
0
            Some((
644
0
                Ok(WriteRequest {
645
0
                    resource_name: local_state.resource_name.clone(),
646
0
                    write_offset,
647
0
                    finish_write: data.is_empty(), // EOF is when no data was polled.
648
0
                    data,
649
0
                }),
650
0
                local_state,
651
0
            ))
652
0
        }));
653
654
0
        self.write(
655
0
            WriteRequestStreamWrapper::from(stream)
656
0
                .await
657
0
                .err_tip(|| "in GrpcStore::update()")?,
658
        )
659
0
        .await
660
0
        .err_tip(|| "in GrpcStore::update()")?;
661
662
0
        Ok(())
663
0
    }
664
665
    async fn get_part(
666
        self: Pin<&Self>,
667
        key: StoreKey<'_>,
668
        writer: &mut DropCloserWriteHalf,
669
        offset: u64,
670
        length: Option<u64>,
671
0
    ) -> Result<(), Error> {
672
0
        let digest = key.into_digest();
673
0
        if matches!(self.store_type, nativelink_config::stores::StoreType::Ac) {
674
0
            let offset = usize::try_from(offset).err_tip(|| "Could not convert offset to usize")?;
675
0
            let length = length
676
0
                .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize"))
677
0
                .transpose()?;
678
679
0
            return self
680
0
                .get_action_result_as_part(digest, writer, offset, length)
681
0
                .await;
682
0
        }
683
0
684
0
        // Shortcut for empty blobs.
685
0
        if digest.size_bytes() == 0 {
  Branch (685:12): [True: 0, False: 0]
  Branch (685:12): [Folded - Ignored]
686
0
            return writer.send_eof();
687
0
        }
688
0
        let digest_function = ActiveOriginContext::get_value(&ACTIVE_HASHER_FUNC)
689
0
            .err_tip(|| "In GrpcStore::update()")?
690
0
            .map_or_else(default_digest_hasher_func, |v| *v)
691
0
            .proto_digest_func()
692
0
            .as_str_name()
693
0
            .to_ascii_lowercase();
694
0
        let resource_name = format!(
695
0
            "{}/blobs/{}/{}/{}",
696
0
            &self.instance_name,
697
0
            digest_function,
698
0
            digest.packed_hash(),
699
0
            digest.size_bytes(),
700
        );
701
702
        struct LocalState<'a> {
703
            resource_name: String,
704
            writer: &'a mut DropCloserWriteHalf,
705
            read_offset: i64,
706
            read_limit: i64,
707
        }
708
709
0
        let local_state = LocalState {
710
0
            resource_name,
711
0
            writer,
712
0
            read_offset: i64::try_from(offset).err_tip(|| "Could not convert offset to i64")?,
713
0
            read_limit: i64::try_from(length.unwrap_or(0))
714
0
                .err_tip(|| "Could not convert length to i64")?,
715
        };
716
717
0
        self.retrier
718
0
            .retry(unfold(local_state, move |mut local_state| async move {
719
0
                let request = ReadRequest {
720
0
                    resource_name: local_state.resource_name.clone(),
721
0
                    read_offset: local_state.read_offset,
722
0
                    read_limit: local_state.read_limit,
723
0
                };
724
0
                let mut stream = match self
725
0
                    .read_internal(request)
726
0
                    .await
727
0
                    .err_tip(|| "in GrpcStore::get_part()")
728
                {
729
0
                    Ok(stream) => stream,
730
0
                    Err(err) => return Some((RetryResult::Retry(err), local_state)),
731
                };
732
733
                loop {
734
0
                    let data = match stream.next().await {
735
                        // Create an empty response to represent EOF.
736
0
                        None => bytes::Bytes::new(),
737
0
                        Some(Ok(message)) => message.data,
738
0
                        Some(Err(status)) => {
739
0
                            return Some((
740
0
                                RetryResult::Retry(
741
0
                                    Into::<Error>::into(status)
742
0
                                        .append("While fetching message in GrpcStore::get_part()"),
743
0
                                ),
744
0
                                local_state,
745
0
                            ));
746
                        }
747
                    };
748
0
                    let length = data.len() as i64;
749
0
                    // This is the usual exit from the loop at EOF.
750
0
                    if length == 0 {
  Branch (750:24): [True: 0, False: 0]
  Branch (750:24): [Folded - Ignored]
751
0
                        let eof_result = local_state
752
0
                            .writer
753
0
                            .send_eof()
754
0
                            .err_tip(|| "Could not send eof in GrpcStore::get_part()")
755
0
                            .map_or_else(RetryResult::Err, RetryResult::Ok);
756
0
                        return Some((eof_result, local_state));
757
0
                    }
758
                    // Forward the data upstream.
759
0
                    if let Err(err) = local_state
  Branch (759:28): [True: 0, False: 0]
  Branch (759:28): [Folded - Ignored]
760
0
                        .writer
761
0
                        .send(data)
762
0
                        .await
763
0
                        .err_tip(|| "While sending in GrpcStore::get_part()")
764
                    {
765
0
                        return Some((RetryResult::Err(err), local_state));
766
0
                    }
767
0
                    local_state.read_offset += length;
768
                }
769
0
            }))
770
0
            .await
771
0
    }
772
773
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
774
0
        self
775
0
    }
776
777
0
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
778
0
        self
779
0
    }
780
781
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
782
0
        self
783
0
    }
784
}
785
786
default_health_status_indicator!(GrpcStore);