Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-store/src/grpc_store.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::pin::Pin;
17
use std::sync::Arc;
18
use std::time::Duration;
19
20
use async_trait::async_trait;
21
use bytes::BytesMut;
22
use futures::stream::{unfold, FuturesUnordered};
23
use futures::{future, Future, Stream, StreamExt, TryFutureExt, TryStreamExt};
24
use nativelink_error::{error_if, make_input_err, Error, ResultExt};
25
use nativelink_metric::MetricsComponent;
26
use nativelink_proto::build::bazel::remote::execution::v2::action_cache_client::ActionCacheClient;
27
use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_client::ContentAddressableStorageClient;
28
use nativelink_proto::build::bazel::remote::execution::v2::{
29
    ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest,
30
    BatchUpdateBlobsResponse, FindMissingBlobsRequest, FindMissingBlobsResponse,
31
    GetActionResultRequest, GetTreeRequest, GetTreeResponse, UpdateActionResultRequest,
32
};
33
use nativelink_proto::google::bytestream::byte_stream_client::ByteStreamClient;
34
use nativelink_proto::google::bytestream::{
35
    QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest,
36
    WriteResponse,
37
};
38
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
39
use nativelink_util::common::DigestInfo;
40
use nativelink_util::connection_manager::ConnectionManager;
41
use nativelink_util::digest_hasher::{default_digest_hasher_func, ACTIVE_HASHER_FUNC};
42
use nativelink_util::health_utils::HealthStatusIndicator;
43
use nativelink_util::origin_context::ActiveOriginContext;
44
use nativelink_util::proto_stream_utils::{
45
    FirstStream, WriteRequestStreamWrapper, WriteState, WriteStateWrapper,
46
};
47
use nativelink_util::resource_info::ResourceInfo;
48
use nativelink_util::retry::{Retrier, RetryResult};
49
use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo};
50
use nativelink_util::{default_health_status_indicator, tls_utils};
51
use parking_lot::Mutex;
52
use prost::Message;
53
use rand::rngs::OsRng;
54
use rand::Rng;
55
use tokio::time::sleep;
56
use tonic::{IntoRequest, Request, Response, Status, Streaming};
57
use tracing::{event, Level};
58
use uuid::Uuid;
59
60
// This store is usually a pass-through store, but can also be used as a CAS store. Using it as an
61
// AC store has one major side-effect... The has() function may not give the proper size of the
62
// underlying data. This might cause issues if embedded in certain stores.
63
0
#[derive(MetricsComponent)]
64
pub struct GrpcStore {
65
    #[metric(help = "Instance name for the store")]
66
    instance_name: String,
67
    store_type: nativelink_config::stores::StoreType,
68
    retrier: Retrier,
69
    connection_manager: ConnectionManager,
70
}
71
72
impl GrpcStore {
73
0
    pub async fn new(config: &nativelink_config::stores::GrpcStore) -> Result<Arc<Self>, Error> {
74
0
        let jitter_amt = config.retry.jitter;
75
0
        Self::new_with_jitter(
76
0
            config,
77
0
            Box::new(move |delay: Duration| {
78
0
                if jitter_amt == 0. {
  Branch (78:20): [True: 0, False: 0]
  Branch (78:20): [Folded - Ignored]
79
0
                    return delay;
80
0
                }
81
0
                let min = 1. - (jitter_amt / 2.);
82
0
                let max = 1. + (jitter_amt / 2.);
83
0
                delay.mul_f32(OsRng.gen_range(min..max))
84
0
            }),
85
0
        )
86
0
        .await
87
0
    }
88
89
0
    pub async fn new_with_jitter(
90
0
        config: &nativelink_config::stores::GrpcStore,
91
0
        jitter_fn: Box<dyn Fn(Duration) -> Duration + Send + Sync>,
92
0
    ) -> Result<Arc<Self>, Error> {
93
0
        error_if!(
94
0
            config.endpoints.is_empty(),
  Branch (94:13): [True: 0, False: 0]
  Branch (94:13): [Folded - Ignored]
95
            "Expected at least 1 endpoint in GrpcStore"
96
        );
97
0
        let mut endpoints = Vec::with_capacity(config.endpoints.len());
98
0
        for endpoint_config in &config.endpoints {
99
0
            let endpoint = tls_utils::endpoint(endpoint_config)
100
0
                .map_err(|e| make_input_err!("Invalid URI for GrpcStore endpoint : {e:?}"))?;
101
0
            endpoints.push(endpoint);
102
        }
103
104
0
        let jitter_fn = Arc::new(jitter_fn);
105
0
        Ok(Arc::new(GrpcStore {
106
0
            instance_name: config.instance_name.clone(),
107
0
            store_type: config.store_type,
108
0
            retrier: Retrier::new(
109
0
                Arc::new(|duration| Box::pin(sleep(duration))),
110
0
                jitter_fn.clone(),
111
0
                config.retry.clone(),
112
0
            ),
113
0
            connection_manager: ConnectionManager::new(
114
0
                endpoints.into_iter(),
115
0
                config.connections_per_endpoint,
116
0
                config.max_concurrent_requests,
117
0
                config.retry.clone(),
118
0
                jitter_fn,
119
0
            ),
120
0
        }))
121
0
    }
122
123
0
    async fn perform_request<F, Fut, R, I>(&self, input: I, mut request: F) -> Result<R, Error>
124
0
    where
125
0
        F: FnMut(I) -> Fut + Send + Copy,
126
0
        Fut: Future<Output = Result<R, Error>> + Send,
127
0
        R: Send,
128
0
        I: Send + Clone,
129
0
    {
130
0
        self.retrier
131
0
            .retry(unfold(input, move |input| async move {
132
0
                let input_clone = input.clone();
133
0
                Some((
134
0
                    request(input_clone)
135
0
                        .await
136
0
                        .map_or_else(RetryResult::Retry, RetryResult::Ok),
137
0
                    input,
138
0
                ))
139
0
            }))
140
0
            .await
141
0
    }
142
143
0
    pub async fn find_missing_blobs(
144
0
        &self,
145
0
        grpc_request: Request<FindMissingBlobsRequest>,
146
0
    ) -> Result<Response<FindMissingBlobsResponse>, Error> {
147
0
        error_if!(
148
0
            matches!(self.store_type, nativelink_config::stores::StoreType::ac),
149
            "CAS operation on AC store"
150
        );
151
152
0
        let mut request = grpc_request.into_inner();
153
0
        request.instance_name.clone_from(&self.instance_name);
154
0
        self.perform_request(request, |request| async move {
155
0
            let channel = self
156
0
                .connection_manager
157
0
                .connection()
158
0
                .await
159
0
                .err_tip(|| "in find_missing_blobs")?;
160
0
            ContentAddressableStorageClient::new(channel)
161
0
                .find_missing_blobs(Request::new(request))
162
0
                .await
163
0
                .err_tip(|| "in GrpcStore::find_missing_blobs")
164
0
        })
165
0
        .await
166
0
    }
167
168
0
    pub async fn batch_update_blobs(
169
0
        &self,
170
0
        grpc_request: Request<BatchUpdateBlobsRequest>,
171
0
    ) -> Result<Response<BatchUpdateBlobsResponse>, Error> {
172
0
        error_if!(
173
0
            matches!(self.store_type, nativelink_config::stores::StoreType::ac),
174
            "CAS operation on AC store"
175
        );
176
177
0
        let mut request = grpc_request.into_inner();
178
0
        request.instance_name.clone_from(&self.instance_name);
179
0
        self.perform_request(request, |request| async move {
180
0
            let channel = self
181
0
                .connection_manager
182
0
                .connection()
183
0
                .await
184
0
                .err_tip(|| "in batch_update_blobs")?;
185
0
            ContentAddressableStorageClient::new(channel)
186
0
                .batch_update_blobs(Request::new(request))
187
0
                .await
188
0
                .err_tip(|| "in GrpcStore::batch_update_blobs")
189
0
        })
190
0
        .await
191
0
    }
192
193
0
    pub async fn batch_read_blobs(
194
0
        &self,
195
0
        grpc_request: Request<BatchReadBlobsRequest>,
196
0
    ) -> Result<Response<BatchReadBlobsResponse>, Error> {
197
0
        error_if!(
198
0
            matches!(self.store_type, nativelink_config::stores::StoreType::ac),
199
            "CAS operation on AC store"
200
        );
201
202
0
        let mut request = grpc_request.into_inner();
203
0
        request.instance_name.clone_from(&self.instance_name);
204
0
        self.perform_request(request, |request| async move {
205
0
            let channel = self
206
0
                .connection_manager
207
0
                .connection()
208
0
                .await
209
0
                .err_tip(|| "in batch_read_blobs")?;
210
0
            ContentAddressableStorageClient::new(channel)
211
0
                .batch_read_blobs(Request::new(request))
212
0
                .await
213
0
                .err_tip(|| "in GrpcStore::batch_read_blobs")
214
0
        })
215
0
        .await
216
0
    }
217
218
0
    pub async fn get_tree(
219
0
        &self,
220
0
        grpc_request: Request<GetTreeRequest>,
221
0
    ) -> Result<Response<Streaming<GetTreeResponse>>, Error> {
222
0
        error_if!(
223
0
            matches!(self.store_type, nativelink_config::stores::StoreType::ac),
224
            "CAS operation on AC store"
225
        );
226
227
0
        let mut request = grpc_request.into_inner();
228
0
        request.instance_name.clone_from(&self.instance_name);
229
0
        self.perform_request(request, |request| async move {
230
0
            let channel = self
231
0
                .connection_manager
232
0
                .connection()
233
0
                .await
234
0
                .err_tip(|| "in get_tree")?;
235
0
            ContentAddressableStorageClient::new(channel)
236
0
                .get_tree(Request::new(request))
237
0
                .await
238
0
                .err_tip(|| "in GrpcStore::get_tree")
239
0
        })
240
0
        .await
241
0
    }
242
243
0
    fn get_read_request(&self, mut request: ReadRequest) -> Result<ReadRequest, Error> {
244
        const IS_UPLOAD_FALSE: bool = false;
245
0
        let mut resource_info = ResourceInfo::new(&request.resource_name, IS_UPLOAD_FALSE)?;
246
0
        if resource_info.instance_name != self.instance_name {
  Branch (246:12): [True: 0, False: 0]
  Branch (246:12): [Folded - Ignored]
247
0
            resource_info.instance_name = Cow::Borrowed(&self.instance_name);
248
0
            request.resource_name = resource_info.to_string(IS_UPLOAD_FALSE);
249
0
        }
250
0
        Ok(request)
251
0
    }
252
253
0
    async fn read_internal(
254
0
        &self,
255
0
        request: ReadRequest,
256
0
    ) -> Result<impl Stream<Item = Result<ReadResponse, Status>>, Error> {
257
0
        let channel = self
258
0
            .connection_manager
259
0
            .connection()
260
0
            .await
261
0
            .err_tip(|| "in read_internal")?;
262
0
        let mut response = ByteStreamClient::new(channel)
263
0
            .read(Request::new(request))
264
0
            .await
265
0
            .err_tip(|| "in GrpcStore::read")?
266
0
            .into_inner();
267
0
        let first_response = response
268
0
            .message()
269
0
            .await
270
0
            .err_tip(|| "Fetching first chunk in GrpcStore::read()")?;
271
0
        Ok(FirstStream::new(first_response, response))
272
0
    }
273
274
0
    pub async fn read(
275
0
        &self,
276
0
        grpc_request: impl IntoRequest<ReadRequest>,
277
0
    ) -> Result<impl Stream<Item = Result<ReadResponse, Status>>, Error> {
278
0
        error_if!(
279
0
            matches!(self.store_type, nativelink_config::stores::StoreType::ac),
280
            "CAS operation on AC store"
281
        );
282
283
0
        let request = self.get_read_request(grpc_request.into_request().into_inner())?;
284
0
        self.perform_request(request, |request| async move {
285
0
            self.read_internal(request).await
286
0
        })
287
0
        .await
288
0
    }
289
290
0
    pub async fn write<T, E>(
291
0
        &self,
292
0
        stream: WriteRequestStreamWrapper<T, E>,
293
0
    ) -> Result<Response<WriteResponse>, Error>
294
0
    where
295
0
        T: Stream<Item = Result<WriteRequest, E>> + Unpin + Send + 'static,
296
0
        E: Into<Error> + 'static,
297
0
    {
298
0
        error_if!(
299
0
            matches!(self.store_type, nativelink_config::stores::StoreType::ac),
300
            "CAS operation on AC store"
301
        );
302
303
0
        let local_state = Arc::new(Mutex::new(WriteState::new(
304
0
            self.instance_name.clone(),
305
0
            stream,
306
0
        )));
307
308
0
        let result = self
309
0
            .retrier
310
0
            .retry(unfold(local_state, move |local_state| async move {
311
                // The client write may occur on a separate thread and
312
                // therefore in order to share the state with it we have to
313
                // wrap it in a Mutex and retrieve it after the write
314
                // has completed.  There is no way to get the value back
315
                // from the client.
316
0
                let result = self
317
0
                    .connection_manager
318
0
                    .connection()
319
0
                    .and_then(|channel| async {
320
0
                        ByteStreamClient::new(channel)
321
0
                            .write(WriteStateWrapper::new(local_state.clone()))
322
0
                            .await
323
0
                            .err_tip(|| "in GrpcStore::write")
324
0
                    })
325
0
                    .await;
326
327
                // Get the state back from StateWrapper, this should be
328
                // uncontended since write has returned.
329
0
                let mut local_state_locked = local_state.lock();
330
331
0
                let result = if let Some(err) = local_state_locked.take_read_stream_error() {
  Branch (331:37): [True: 0, False: 0]
  Branch (331:37): [True: 0, False: 0]
  Branch (331:37): [Folded - Ignored]
332
                    // If there was an error with the stream, then don't retry.
333
0
                    RetryResult::Err(err.append("Where read_stream_error was set"))
334
                } else {
335
                    // On error determine whether it is possible to retry.
336
0
                    match result {
337
0
                        Err(err) => {
338
0
                            if local_state_locked.can_resume() {
  Branch (338:32): [True: 0, False: 0]
  Branch (338:32): [True: 0, False: 0]
  Branch (338:32): [Folded - Ignored]
339
0
                                local_state_locked.resume();
340
0
                                RetryResult::Retry(err)
341
                            } else {
342
0
                                RetryResult::Err(err.append("Retry is not possible"))
343
                            }
344
                        }
345
0
                        Ok(response) => RetryResult::Ok(response),
346
                    }
347
                };
348
349
0
                drop(local_state_locked);
350
0
                Some((result, local_state))
351
0
            }))
352
0
            .await?;
353
0
        Ok(result)
354
0
    }
355
356
0
    pub async fn query_write_status(
357
0
        &self,
358
0
        grpc_request: Request<QueryWriteStatusRequest>,
359
0
    ) -> Result<Response<QueryWriteStatusResponse>, Error> {
360
0
        error_if!(
361
0
            matches!(self.store_type, nativelink_config::stores::StoreType::ac),
362
            "CAS operation on AC store"
363
        );
364
365
0
        let mut request = grpc_request.into_inner();
366
367
        const IS_UPLOAD_TRUE: bool = true;
368
0
        let mut request_info = ResourceInfo::new(&request.resource_name, IS_UPLOAD_TRUE)?;
369
0
        if request_info.instance_name != self.instance_name {
  Branch (369:12): [True: 0, False: 0]
  Branch (369:12): [Folded - Ignored]
  Branch (369:12): [Folded - Ignored]
370
0
            request_info.instance_name = Cow::Borrowed(&self.instance_name);
371
0
            request.resource_name = request_info.to_string(IS_UPLOAD_TRUE);
372
0
        }
373
374
0
        self.perform_request(request, |request| async move {
375
0
            let channel = self
376
0
                .connection_manager
377
0
                .connection()
378
0
                .await
379
0
                .err_tip(|| "in query_write_status")?;
380
0
            ByteStreamClient::new(channel)
381
0
                .query_write_status(Request::new(request))
382
0
                .await
383
0
                .err_tip(|| "in GrpcStore::query_write_status")
384
0
        })
385
0
        .await
386
0
    }
387
388
0
    pub async fn get_action_result(
389
0
        &self,
390
0
        grpc_request: Request<GetActionResultRequest>,
391
0
    ) -> Result<Response<ActionResult>, Error> {
392
0
        let mut request = grpc_request.into_inner();
393
0
        request.instance_name.clone_from(&self.instance_name);
394
0
        self.perform_request(request, |request| async move {
395
0
            let channel = self
396
0
                .connection_manager
397
0
                .connection()
398
0
                .await
399
0
                .err_tip(|| "in get_action_result")?;
400
0
            ActionCacheClient::new(channel)
401
0
                .get_action_result(Request::new(request))
402
0
                .await
403
0
                .err_tip(|| "in GrpcStore::get_action_result")
404
0
        })
405
0
        .await
406
0
    }
407
408
0
    pub async fn update_action_result(
409
0
        &self,
410
0
        grpc_request: Request<UpdateActionResultRequest>,
411
0
    ) -> Result<Response<ActionResult>, Error> {
412
0
        let mut request = grpc_request.into_inner();
413
0
        request.instance_name.clone_from(&self.instance_name);
414
0
        self.perform_request(request, |request| async move {
415
0
            let channel = self
416
0
                .connection_manager
417
0
                .connection()
418
0
                .await
419
0
                .err_tip(|| "in update_action_result")?;
420
0
            ActionCacheClient::new(channel)
421
0
                .update_action_result(Request::new(request))
422
0
                .await
423
0
                .err_tip(|| "in GrpcStore::update_action_result")
424
0
        })
425
0
        .await
426
0
    }
427
428
0
    async fn get_action_result_from_digest(
429
0
        &self,
430
0
        digest: DigestInfo,
431
0
    ) -> Result<Response<ActionResult>, Error> {
432
0
        let action_result_request = GetActionResultRequest {
433
0
            instance_name: self.instance_name.clone(),
434
0
            action_digest: Some(digest.into()),
435
0
            inline_stdout: false,
436
0
            inline_stderr: false,
437
0
            inline_output_files: Vec::new(),
438
0
            digest_function: ActiveOriginContext::get_value(&ACTIVE_HASHER_FUNC)
439
0
                .err_tip(|| "In get_action_from_store")?
440
0
                .map_or_else(default_digest_hasher_func, |v| *v)
441
0
                .proto_digest_func()
442
0
                .into(),
443
0
        };
444
0
        self.get_action_result(Request::new(action_result_request))
445
0
            .await
446
0
    }
447
448
0
    async fn get_action_result_as_part(
449
0
        &self,
450
0
        digest: DigestInfo,
451
0
        writer: &mut DropCloserWriteHalf,
452
0
        offset: usize,
453
0
        length: Option<usize>,
454
0
    ) -> Result<(), Error> {
455
0
        let action_result = self
456
0
            .get_action_result_from_digest(digest)
457
0
            .await
458
0
            .map(Response::into_inner)
459
0
            .err_tip(|| "Action result not found")?;
460
        // TODO: Would be better to avoid all the encoding and decoding in this
461
        //       file, however there's no way to currently get raw bytes from a
462
        //       generated prost request unfortunately.
463
0
        let mut value = BytesMut::new();
464
0
        action_result
465
0
            .encode(&mut value)
466
0
            .err_tip(|| "Could not encode upstream action result")?;
467
468
0
        let default_len = value.len() - offset;
469
0
        let length = length.unwrap_or(default_len).min(default_len);
470
0
        if length > 0 {
  Branch (470:12): [True: 0, False: 0]
  Branch (470:12): [Folded - Ignored]
471
0
            writer
472
0
                .send(value.freeze().slice(offset..offset + length))
473
0
                .await
474
0
                .err_tip(|| "Failed to write data in grpc store")?;
475
0
        }
476
0
        writer
477
0
            .send_eof()
478
0
            .err_tip(|| "Failed to write EOF in grpc store get_action_result_as_part")?;
479
0
        Ok(())
480
0
    }
481
482
0
    async fn update_action_result_from_bytes(
483
0
        &self,
484
0
        digest: DigestInfo,
485
0
        mut reader: DropCloserReadHalf,
486
0
    ) -> Result<(), Error> {
487
0
        let action_result = ActionResult::decode(reader.consume(None).await?)
488
0
            .err_tip(|| "Failed to decode ActionResult in update_action_result_from_bytes")?;
489
0
        let update_action_request = UpdateActionResultRequest {
490
0
            instance_name: self.instance_name.clone(),
491
0
            action_digest: Some(digest.into()),
492
0
            action_result: Some(action_result),
493
0
            results_cache_policy: None,
494
0
            digest_function: ActiveOriginContext::get_value(&ACTIVE_HASHER_FUNC)
495
0
                .err_tip(|| "In get_action_from_store")?
496
0
                .map_or_else(default_digest_hasher_func, |v| *v)
497
0
                .proto_digest_func()
498
0
                .into(),
499
0
        };
500
0
        self.update_action_result(Request::new(update_action_request))
501
0
            .await
502
0
            .map(|_| ())
503
0
    }
504
}
505
506
#[async_trait]
507
impl StoreDriver for GrpcStore {
508
    // NOTE: This function can only be safely used on CAS stores. AC stores may return a size that
509
    // is incorrect.
510
    async fn has_with_results(
511
        self: Pin<&Self>,
512
        keys: &[StoreKey<'_>],
513
        results: &mut [Option<u64>],
514
0
    ) -> Result<(), Error> {
515
0
        if matches!(self.store_type, nativelink_config::stores::StoreType::ac) {
516
0
            keys.iter()
517
0
                .zip(results.iter_mut())
518
0
                .map(|(key, result)| async move {
519
0
                    // The length of an AC is incorrect, so we don't figure out the
520
0
                    // length, instead the biggest possible result is returned in the
521
0
                    // hope that we detect incorrect usage.
522
0
                    self.get_action_result_from_digest(key.borrow().into_digest())
523
0
                        .await?;
524
0
                    *result = Some(u64::MAX);
525
0
                    Ok::<_, Error>(())
526
0
                })
527
0
                .collect::<FuturesUnordered<_>>()
528
0
                .try_for_each(|_| future::ready(Ok(())))
529
0
                .await
530
0
                .err_tip(|| "Getting upstream action cache entry")?;
531
0
            return Ok(());
532
0
        }
533
534
0
        let missing_blobs_response = self
535
0
            .find_missing_blobs(Request::new(FindMissingBlobsRequest {
536
0
                instance_name: self.instance_name.clone(),
537
0
                blob_digests: keys
538
0
                    .iter()
539
0
                    .map(|k| k.borrow().into_digest().into())
540
0
                    .collect(),
541
0
                digest_function: ActiveOriginContext::get_value(&ACTIVE_HASHER_FUNC)
542
0
                    .err_tip(|| "In GrpcStore::has_with_results")?
543
0
                    .map_or_else(default_digest_hasher_func, |v| *v)
544
0
                    .proto_digest_func()
545
0
                    .into(),
546
0
            }))
547
0
            .await?
548
0
            .into_inner();
549
0
550
0
        // Since the ordering is not guaranteed above, the matching has to check
551
0
        // all missing blobs against all entries in the unsorted digest list.
552
0
        // To optimise this, the missing digests are sorted and then it is
553
0
        // efficient to perform a binary search for each digest within the
554
0
        // missing list.
555
0
        let mut missing_digests =
556
0
            Vec::with_capacity(missing_blobs_response.missing_blob_digests.len());
557
0
        for missing_digest in missing_blobs_response.missing_blob_digests {
558
0
            missing_digests.push(DigestInfo::try_from(missing_digest)?);
559
        }
560
0
        missing_digests.sort_unstable();
561
0
        for (digest, result) in keys
562
0
            .iter()
563
0
            .map(|v| v.borrow().into_digest())
564
0
            .zip(results.iter_mut())
565
        {
566
0
            match missing_digests.binary_search(&digest) {
567
0
                Ok(_) => *result = None,
568
0
                Err(_) => *result = Some(digest.size_bytes()),
569
            }
570
        }
571
572
0
        Ok(())
573
0
    }
574
575
    async fn update(
576
        self: Pin<&Self>,
577
        key: StoreKey<'_>,
578
        reader: DropCloserReadHalf,
579
        _size_info: UploadSizeInfo,
580
0
    ) -> Result<(), Error> {
581
0
        let digest = key.into_digest();
582
0
        if matches!(self.store_type, nativelink_config::stores::StoreType::ac) {
583
0
            return self.update_action_result_from_bytes(digest, reader).await;
584
0
        }
585
0
586
0
        let mut buf = Uuid::encode_buffer();
587
0
        let resource_name = format!(
588
0
            "{}/uploads/{}/blobs/{}/{}",
589
0
            &self.instance_name,
590
0
            Uuid::new_v4().hyphenated().encode_lower(&mut buf),
591
0
            digest.packed_hash(),
592
0
            digest.size_bytes(),
593
0
        );
594
595
        struct LocalState {
596
            resource_name: String,
597
            reader: DropCloserReadHalf,
598
            did_error: bool,
599
            bytes_received: i64,
600
        }
601
0
        let local_state = LocalState {
602
0
            resource_name,
603
0
            reader,
604
0
            did_error: false,
605
0
            bytes_received: 0,
606
0
        };
607
0
608
0
        let stream = Box::pin(unfold(local_state, |mut local_state| async move {
609
0
            if local_state.did_error {
  Branch (609:16): [True: 0, False: 0]
  Branch (609:16): [Folded - Ignored]
610
0
                event!(
611
0
                    Level::ERROR,
612
0
                    "GrpcStore::update() polled stream after error was returned"
613
                );
614
0
                return None;
615
0
            }
616
0
            let data = match local_state
617
0
                .reader
618
0
                .recv()
619
0
                .await
620
0
                .err_tip(|| "In GrpcStore::update()")
621
            {
622
0
                Ok(data) => data,
623
0
                Err(err) => {
624
0
                    local_state.did_error = true;
625
0
                    return Some((Err(err), local_state));
626
                }
627
            };
628
629
0
            let write_offset = local_state.bytes_received;
630
0
            local_state.bytes_received += data.len() as i64;
631
0
632
0
            Some((
633
0
                Ok(WriteRequest {
634
0
                    resource_name: local_state.resource_name.clone(),
635
0
                    write_offset,
636
0
                    finish_write: data.is_empty(), // EOF is when no data was polled.
637
0
                    data,
638
0
                }),
639
0
                local_state,
640
0
            ))
641
0
        }));
642
0
643
0
        self.write(
644
0
            WriteRequestStreamWrapper::from(stream)
645
0
                .await
646
0
                .err_tip(|| "in GrpcStore::update()")?,
647
        )
648
0
        .await
649
0
        .err_tip(|| "in GrpcStore::update()")?;
650
651
0
        Ok(())
652
0
    }
653
654
    async fn get_part(
655
        self: Pin<&Self>,
656
        key: StoreKey<'_>,
657
        writer: &mut DropCloserWriteHalf,
658
        offset: u64,
659
        length: Option<u64>,
660
0
    ) -> Result<(), Error> {
661
0
        let digest = key.into_digest();
662
0
        if matches!(self.store_type, nativelink_config::stores::StoreType::ac) {
663
0
            let offset = usize::try_from(offset).err_tip(|| "Could not convert offset to usize")?;
664
0
            let length = length
665
0
                .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize"))
666
0
                .transpose()?;
667
668
0
            return self
669
0
                .get_action_result_as_part(digest, writer, offset, length)
670
0
                .await;
671
0
        }
672
0
673
0
        // Shortcut for empty blobs.
674
0
        if digest.size_bytes() == 0 {
  Branch (674:12): [True: 0, False: 0]
  Branch (674:12): [Folded - Ignored]
675
0
            return writer.send_eof();
676
0
        }
677
0
678
0
        let resource_name = format!(
679
0
            "{}/blobs/{}/{}",
680
0
            &self.instance_name,
681
0
            digest.packed_hash(),
682
0
            digest.size_bytes(),
683
0
        );
684
685
        struct LocalState<'a> {
686
            resource_name: String,
687
            writer: &'a mut DropCloserWriteHalf,
688
            read_offset: i64,
689
            read_limit: i64,
690
        }
691
692
0
        let local_state = LocalState {
693
0
            resource_name,
694
0
            writer,
695
0
            read_offset: i64::try_from(offset).err_tip(|| "Could not convert offset to i64")?,
696
0
            read_limit: i64::try_from(length.unwrap_or(0))
697
0
                .err_tip(|| "Could not convert length to i64")?,
698
        };
699
700
0
        self.retrier
701
0
            .retry(unfold(local_state, move |mut local_state| async move {
702
0
                let request = ReadRequest {
703
0
                    resource_name: local_state.resource_name.clone(),
704
0
                    read_offset: local_state.read_offset,
705
0
                    read_limit: local_state.read_limit,
706
0
                };
707
0
                let mut stream = match self
708
0
                    .read_internal(request)
709
0
                    .await
710
0
                    .err_tip(|| "in GrpcStore::get_part()")
711
                {
712
0
                    Ok(stream) => stream,
713
0
                    Err(err) => return Some((RetryResult::Retry(err), local_state)),
714
                };
715
716
                loop {
717
0
                    let data = match stream.next().await {
718
                        // Create an empty response to represent EOF.
719
0
                        None => bytes::Bytes::new(),
720
0
                        Some(Ok(message)) => message.data,
721
0
                        Some(Err(status)) => {
722
0
                            return Some((
723
0
                                RetryResult::Retry(
724
0
                                    Into::<Error>::into(status)
725
0
                                        .append("While fetching message in GrpcStore::get_part()"),
726
0
                                ),
727
0
                                local_state,
728
0
                            ))
729
                        }
730
                    };
731
0
                    let length = data.len() as i64;
732
0
                    // This is the usual exit from the loop at EOF.
733
0
                    if length == 0 {
  Branch (733:24): [True: 0, False: 0]
  Branch (733:24): [Folded - Ignored]
734
0
                        let eof_result = local_state
735
0
                            .writer
736
0
                            .send_eof()
737
0
                            .err_tip(|| "Could not send eof in GrpcStore::get_part()")
738
0
                            .map_or_else(RetryResult::Err, RetryResult::Ok);
739
0
                        return Some((eof_result, local_state));
740
0
                    }
741
                    // Forward the data upstream.
742
0
                    if let Err(err) = local_state
  Branch (742:28): [True: 0, False: 0]
  Branch (742:28): [Folded - Ignored]
743
0
                        .writer
744
0
                        .send(data)
745
0
                        .await
746
0
                        .err_tip(|| "While sending in GrpcStore::get_part()")
747
                    {
748
0
                        return Some((RetryResult::Err(err), local_state));
749
0
                    }
750
0
                    local_state.read_offset += length;
751
                }
752
0
            }))
753
0
            .await
754
0
    }
755
756
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
757
0
        self
758
0
    }
759
760
0
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
761
0
        self
762
0
    }
763
764
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
765
0
        self
766
0
    }
767
}
768
769
default_health_status_indicator!(GrpcStore);