Coverage Report

Created: 2024-11-22 20:17

/build/source/nativelink-store/src/grpc_store.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::pin::Pin;
17
use std::sync::Arc;
18
use std::time::Duration;
19
20
use async_trait::async_trait;
21
use bytes::BytesMut;
22
use futures::stream::{unfold, FuturesUnordered};
23
use futures::{future, Future, Stream, StreamExt, TryFutureExt, TryStreamExt};
24
use nativelink_config::stores::GrpcSpec;
25
use nativelink_error::{error_if, make_input_err, Error, ResultExt};
26
use nativelink_metric::MetricsComponent;
27
use nativelink_proto::build::bazel::remote::execution::v2::action_cache_client::ActionCacheClient;
28
use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_client::ContentAddressableStorageClient;
29
use nativelink_proto::build::bazel::remote::execution::v2::{
30
    ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest,
31
    BatchUpdateBlobsResponse, FindMissingBlobsRequest, FindMissingBlobsResponse,
32
    GetActionResultRequest, GetTreeRequest, GetTreeResponse, UpdateActionResultRequest,
33
};
34
use nativelink_proto::google::bytestream::byte_stream_client::ByteStreamClient;
35
use nativelink_proto::google::bytestream::{
36
    QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest,
37
    WriteResponse,
38
};
39
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
40
use nativelink_util::common::DigestInfo;
41
use nativelink_util::connection_manager::ConnectionManager;
42
use nativelink_util::digest_hasher::{default_digest_hasher_func, ACTIVE_HASHER_FUNC};
43
use nativelink_util::health_utils::HealthStatusIndicator;
44
use nativelink_util::origin_context::ActiveOriginContext;
45
use nativelink_util::proto_stream_utils::{
46
    FirstStream, WriteRequestStreamWrapper, WriteState, WriteStateWrapper,
47
};
48
use nativelink_util::resource_info::ResourceInfo;
49
use nativelink_util::retry::{Retrier, RetryResult};
50
use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo};
51
use nativelink_util::{default_health_status_indicator, tls_utils};
52
use parking_lot::Mutex;
53
use prost::Message;
54
use rand::rngs::OsRng;
55
use rand::Rng;
56
use tokio::time::sleep;
57
use tonic::{IntoRequest, Request, Response, Status, Streaming};
58
use tracing::{event, Level};
59
use uuid::Uuid;
60
61
// This store is usually a pass-through store, but can also be used as a CAS store. Using it as an
62
// AC store has one major side-effect... The has() function may not give the proper size of the
63
// underlying data. This might cause issues if embedded in certain stores.
64
0
#[derive(MetricsComponent)]
65
pub struct GrpcStore {
66
    #[metric(help = "Instance name for the store")]
67
    instance_name: String,
68
    store_type: nativelink_config::stores::StoreType,
69
    retrier: Retrier,
70
    connection_manager: ConnectionManager,
71
}
72
73
impl GrpcStore {
74
0
    pub async fn new(spec: &GrpcSpec) -> Result<Arc<Self>, Error> {
75
0
        let jitter_amt = spec.retry.jitter;
76
0
        Self::new_with_jitter(
77
0
            spec,
78
0
            Box::new(move |delay: Duration| {
79
0
                if jitter_amt == 0. {
  Branch (79:20): [True: 0, False: 0]
  Branch (79:20): [Folded - Ignored]
80
0
                    return delay;
81
0
                }
82
0
                let min = 1. - (jitter_amt / 2.);
83
0
                let max = 1. + (jitter_amt / 2.);
84
0
                delay.mul_f32(OsRng.gen_range(min..max))
85
0
            }),
86
0
        )
87
0
        .await
88
0
    }
89
90
0
    pub async fn new_with_jitter(
91
0
        spec: &GrpcSpec,
92
0
        jitter_fn: Box<dyn Fn(Duration) -> Duration + Send + Sync>,
93
0
    ) -> Result<Arc<Self>, Error> {
94
0
        error_if!(
95
0
            spec.endpoints.is_empty(),
  Branch (95:13): [True: 0, False: 0]
  Branch (95:13): [Folded - Ignored]
96
            "Expected at least 1 endpoint in GrpcStore"
97
        );
98
0
        let mut endpoints = Vec::with_capacity(spec.endpoints.len());
99
0
        for endpoint_config in &spec.endpoints {
100
0
            let endpoint = tls_utils::endpoint(endpoint_config)
101
0
                .map_err(|e| make_input_err!("Invalid URI for GrpcStore endpoint : {e:?}"))?;
102
0
            endpoints.push(endpoint);
103
        }
104
105
0
        let jitter_fn = Arc::new(jitter_fn);
106
0
        Ok(Arc::new(GrpcStore {
107
0
            instance_name: spec.instance_name.clone(),
108
0
            store_type: spec.store_type,
109
0
            retrier: Retrier::new(
110
0
                Arc::new(|duration| Box::pin(sleep(duration))),
111
0
                jitter_fn.clone(),
112
0
                spec.retry.clone(),
113
0
            ),
114
0
            connection_manager: ConnectionManager::new(
115
0
                endpoints.into_iter(),
116
0
                spec.connections_per_endpoint,
117
0
                spec.max_concurrent_requests,
118
0
                spec.retry.clone(),
119
0
                jitter_fn,
120
0
            ),
121
0
        }))
122
0
    }
123
124
0
    async fn perform_request<F, Fut, R, I>(&self, input: I, mut request: F) -> Result<R, Error>
125
0
    where
126
0
        F: FnMut(I) -> Fut + Send + Copy,
127
0
        Fut: Future<Output = Result<R, Error>> + Send,
128
0
        R: Send,
129
0
        I: Send + Clone,
130
0
    {
131
0
        self.retrier
132
0
            .retry(unfold(input, move |input| async move {
133
0
                let input_clone = input.clone();
134
0
                Some((
135
0
                    request(input_clone)
136
0
                        .await
137
0
                        .map_or_else(RetryResult::Retry, RetryResult::Ok),
138
0
                    input,
139
0
                ))
140
0
            }))
141
0
            .await
142
0
    }
143
144
0
    pub async fn find_missing_blobs(
145
0
        &self,
146
0
        grpc_request: Request<FindMissingBlobsRequest>,
147
0
    ) -> Result<Response<FindMissingBlobsResponse>, Error> {
148
0
        error_if!(
149
0
            matches!(self.store_type, nativelink_config::stores::StoreType::ac),
150
            "CAS operation on AC store"
151
        );
152
153
0
        let mut request = grpc_request.into_inner();
154
0
        request.instance_name.clone_from(&self.instance_name);
155
0
        self.perform_request(request, |request| async move {
156
0
            let channel = self
157
0
                .connection_manager
158
0
                .connection()
159
0
                .await
160
0
                .err_tip(|| "in find_missing_blobs")?;
161
0
            ContentAddressableStorageClient::new(channel)
162
0
                .find_missing_blobs(Request::new(request))
163
0
                .await
164
0
                .err_tip(|| "in GrpcStore::find_missing_blobs")
165
0
        })
166
0
        .await
167
0
    }
168
169
0
    pub async fn batch_update_blobs(
170
0
        &self,
171
0
        grpc_request: Request<BatchUpdateBlobsRequest>,
172
0
    ) -> Result<Response<BatchUpdateBlobsResponse>, Error> {
173
0
        error_if!(
174
0
            matches!(self.store_type, nativelink_config::stores::StoreType::ac),
175
            "CAS operation on AC store"
176
        );
177
178
0
        let mut request = grpc_request.into_inner();
179
0
        request.instance_name.clone_from(&self.instance_name);
180
0
        self.perform_request(request, |request| async move {
181
0
            let channel = self
182
0
                .connection_manager
183
0
                .connection()
184
0
                .await
185
0
                .err_tip(|| "in batch_update_blobs")?;
186
0
            ContentAddressableStorageClient::new(channel)
187
0
                .batch_update_blobs(Request::new(request))
188
0
                .await
189
0
                .err_tip(|| "in GrpcStore::batch_update_blobs")
190
0
        })
191
0
        .await
192
0
    }
193
194
0
    pub async fn batch_read_blobs(
195
0
        &self,
196
0
        grpc_request: Request<BatchReadBlobsRequest>,
197
0
    ) -> Result<Response<BatchReadBlobsResponse>, Error> {
198
0
        error_if!(
199
0
            matches!(self.store_type, nativelink_config::stores::StoreType::ac),
200
            "CAS operation on AC store"
201
        );
202
203
0
        let mut request = grpc_request.into_inner();
204
0
        request.instance_name.clone_from(&self.instance_name);
205
0
        self.perform_request(request, |request| async move {
206
0
            let channel = self
207
0
                .connection_manager
208
0
                .connection()
209
0
                .await
210
0
                .err_tip(|| "in batch_read_blobs")?;
211
0
            ContentAddressableStorageClient::new(channel)
212
0
                .batch_read_blobs(Request::new(request))
213
0
                .await
214
0
                .err_tip(|| "in GrpcStore::batch_read_blobs")
215
0
        })
216
0
        .await
217
0
    }
218
219
0
    pub async fn get_tree(
220
0
        &self,
221
0
        grpc_request: Request<GetTreeRequest>,
222
0
    ) -> Result<Response<Streaming<GetTreeResponse>>, Error> {
223
0
        error_if!(
224
0
            matches!(self.store_type, nativelink_config::stores::StoreType::ac),
225
            "CAS operation on AC store"
226
        );
227
228
0
        let mut request = grpc_request.into_inner();
229
0
        request.instance_name.clone_from(&self.instance_name);
230
0
        self.perform_request(request, |request| async move {
231
0
            let channel = self
232
0
                .connection_manager
233
0
                .connection()
234
0
                .await
235
0
                .err_tip(|| "in get_tree")?;
236
0
            ContentAddressableStorageClient::new(channel)
237
0
                .get_tree(Request::new(request))
238
0
                .await
239
0
                .err_tip(|| "in GrpcStore::get_tree")
240
0
        })
241
0
        .await
242
0
    }
243
244
0
    fn get_read_request(&self, mut request: ReadRequest) -> Result<ReadRequest, Error> {
245
        const IS_UPLOAD_FALSE: bool = false;
246
0
        let mut resource_info = ResourceInfo::new(&request.resource_name, IS_UPLOAD_FALSE)?;
247
0
        if resource_info.instance_name != self.instance_name {
  Branch (247:12): [True: 0, False: 0]
  Branch (247:12): [Folded - Ignored]
248
0
            resource_info.instance_name = Cow::Borrowed(&self.instance_name);
249
0
            request.resource_name = resource_info.to_string(IS_UPLOAD_FALSE);
250
0
        }
251
0
        Ok(request)
252
0
    }
253
254
0
    async fn read_internal(
255
0
        &self,
256
0
        request: ReadRequest,
257
0
    ) -> Result<impl Stream<Item = Result<ReadResponse, Status>>, Error> {
258
0
        let channel = self
259
0
            .connection_manager
260
0
            .connection()
261
0
            .await
262
0
            .err_tip(|| "in read_internal")?;
263
0
        let mut response = ByteStreamClient::new(channel)
264
0
            .read(Request::new(request))
265
0
            .await
266
0
            .err_tip(|| "in GrpcStore::read")?
267
0
            .into_inner();
268
0
        let first_response = response
269
0
            .message()
270
0
            .await
271
0
            .err_tip(|| "Fetching first chunk in GrpcStore::read()")?;
272
0
        Ok(FirstStream::new(first_response, response))
273
0
    }
274
275
0
    pub async fn read(
276
0
        &self,
277
0
        grpc_request: impl IntoRequest<ReadRequest>,
278
0
    ) -> Result<impl Stream<Item = Result<ReadResponse, Status>>, Error> {
279
0
        error_if!(
280
0
            matches!(self.store_type, nativelink_config::stores::StoreType::ac),
281
            "CAS operation on AC store"
282
        );
283
284
0
        let request = self.get_read_request(grpc_request.into_request().into_inner())?;
285
0
        self.perform_request(request, |request| async move {
286
0
            self.read_internal(request).await
287
0
        })
288
0
        .await
289
0
    }
290
291
0
    pub async fn write<T, E>(
292
0
        &self,
293
0
        stream: WriteRequestStreamWrapper<T, E>,
294
0
    ) -> Result<Response<WriteResponse>, Error>
295
0
    where
296
0
        T: Stream<Item = Result<WriteRequest, E>> + Unpin + Send + 'static,
297
0
        E: Into<Error> + 'static,
298
0
    {
299
0
        error_if!(
300
0
            matches!(self.store_type, nativelink_config::stores::StoreType::ac),
301
            "CAS operation on AC store"
302
        );
303
304
0
        let local_state = Arc::new(Mutex::new(WriteState::new(
305
0
            self.instance_name.clone(),
306
0
            stream,
307
0
        )));
308
309
0
        let result = self
310
0
            .retrier
311
0
            .retry(unfold(local_state, move |local_state| async move {
312
                // The client write may occur on a separate thread and
313
                // therefore in order to share the state with it we have to
314
                // wrap it in a Mutex and retrieve it after the write
315
                // has completed.  There is no way to get the value back
316
                // from the client.
317
0
                let result = self
318
0
                    .connection_manager
319
0
                    .connection()
320
0
                    .and_then(|channel| async {
321
0
                        ByteStreamClient::new(channel)
322
0
                            .write(WriteStateWrapper::new(local_state.clone()))
323
0
                            .await
324
0
                            .err_tip(|| "in GrpcStore::write")
325
0
                    })
326
0
                    .await;
327
328
                // Get the state back from StateWrapper, this should be
329
                // uncontended since write has returned.
330
0
                let mut local_state_locked = local_state.lock();
331
332
0
                let result = if let Some(err) = local_state_locked.take_read_stream_error() {
  Branch (332:37): [True: 0, False: 0]
  Branch (332:37): [True: 0, False: 0]
  Branch (332:37): [Folded - Ignored]
333
                    // If there was an error with the stream, then don't retry.
334
0
                    RetryResult::Err(err.append("Where read_stream_error was set"))
335
                } else {
336
                    // On error determine whether it is possible to retry.
337
0
                    match result {
338
0
                        Err(err) => {
339
0
                            if local_state_locked.can_resume() {
  Branch (339:32): [True: 0, False: 0]
  Branch (339:32): [True: 0, False: 0]
  Branch (339:32): [Folded - Ignored]
340
0
                                local_state_locked.resume();
341
0
                                RetryResult::Retry(err)
342
                            } else {
343
0
                                RetryResult::Err(err.append("Retry is not possible"))
344
                            }
345
                        }
346
0
                        Ok(response) => RetryResult::Ok(response),
347
                    }
348
                };
349
350
0
                drop(local_state_locked);
351
0
                Some((result, local_state))
352
0
            }))
353
0
            .await?;
354
0
        Ok(result)
355
0
    }
356
357
0
    pub async fn query_write_status(
358
0
        &self,
359
0
        grpc_request: Request<QueryWriteStatusRequest>,
360
0
    ) -> Result<Response<QueryWriteStatusResponse>, Error> {
361
        const IS_UPLOAD_TRUE: bool = true;
362
363
0
        error_if!(
364
0
            matches!(self.store_type, nativelink_config::stores::StoreType::ac),
365
            "CAS operation on AC store"
366
        );
367
368
0
        let mut request = grpc_request.into_inner();
369
370
0
        let mut request_info = ResourceInfo::new(&request.resource_name, IS_UPLOAD_TRUE)?;
371
0
        if request_info.instance_name != self.instance_name {
  Branch (371:12): [True: 0, False: 0]
  Branch (371:12): [Folded - Ignored]
  Branch (371:12): [Folded - Ignored]
372
0
            request_info.instance_name = Cow::Borrowed(&self.instance_name);
373
0
            request.resource_name = request_info.to_string(IS_UPLOAD_TRUE);
374
0
        }
375
376
0
        self.perform_request(request, |request| async move {
377
0
            let channel = self
378
0
                .connection_manager
379
0
                .connection()
380
0
                .await
381
0
                .err_tip(|| "in query_write_status")?;
382
0
            ByteStreamClient::new(channel)
383
0
                .query_write_status(Request::new(request))
384
0
                .await
385
0
                .err_tip(|| "in GrpcStore::query_write_status")
386
0
        })
387
0
        .await
388
0
    }
389
390
0
    pub async fn get_action_result(
391
0
        &self,
392
0
        grpc_request: Request<GetActionResultRequest>,
393
0
    ) -> Result<Response<ActionResult>, Error> {
394
0
        let mut request = grpc_request.into_inner();
395
0
        request.instance_name.clone_from(&self.instance_name);
396
0
        self.perform_request(request, |request| async move {
397
0
            let channel = self
398
0
                .connection_manager
399
0
                .connection()
400
0
                .await
401
0
                .err_tip(|| "in get_action_result")?;
402
0
            ActionCacheClient::new(channel)
403
0
                .get_action_result(Request::new(request))
404
0
                .await
405
0
                .err_tip(|| "in GrpcStore::get_action_result")
406
0
        })
407
0
        .await
408
0
    }
409
410
0
    pub async fn update_action_result(
411
0
        &self,
412
0
        grpc_request: Request<UpdateActionResultRequest>,
413
0
    ) -> Result<Response<ActionResult>, Error> {
414
0
        let mut request = grpc_request.into_inner();
415
0
        request.instance_name.clone_from(&self.instance_name);
416
0
        self.perform_request(request, |request| async move {
417
0
            let channel = self
418
0
                .connection_manager
419
0
                .connection()
420
0
                .await
421
0
                .err_tip(|| "in update_action_result")?;
422
0
            ActionCacheClient::new(channel)
423
0
                .update_action_result(Request::new(request))
424
0
                .await
425
0
                .err_tip(|| "in GrpcStore::update_action_result")
426
0
        })
427
0
        .await
428
0
    }
429
430
0
    async fn get_action_result_from_digest(
431
0
        &self,
432
0
        digest: DigestInfo,
433
0
    ) -> Result<Response<ActionResult>, Error> {
434
0
        let action_result_request = GetActionResultRequest {
435
0
            instance_name: self.instance_name.clone(),
436
0
            action_digest: Some(digest.into()),
437
0
            inline_stdout: false,
438
0
            inline_stderr: false,
439
0
            inline_output_files: Vec::new(),
440
0
            digest_function: ActiveOriginContext::get_value(&ACTIVE_HASHER_FUNC)
441
0
                .err_tip(|| "In get_action_from_store")?
442
0
                .map_or_else(default_digest_hasher_func, |v| *v)
443
0
                .proto_digest_func()
444
0
                .into(),
445
0
        };
446
0
        self.get_action_result(Request::new(action_result_request))
447
0
            .await
448
0
    }
449
450
0
    async fn get_action_result_as_part(
451
0
        &self,
452
0
        digest: DigestInfo,
453
0
        writer: &mut DropCloserWriteHalf,
454
0
        offset: usize,
455
0
        length: Option<usize>,
456
0
    ) -> Result<(), Error> {
457
0
        let action_result = self
458
0
            .get_action_result_from_digest(digest)
459
0
            .await
460
0
            .map(Response::into_inner)
461
0
            .err_tip(|| "Action result not found")?;
462
        // TODO: Would be better to avoid all the encoding and decoding in this
463
        //       file, however there's no way to currently get raw bytes from a
464
        //       generated prost request unfortunately.
465
0
        let mut value = BytesMut::new();
466
0
        action_result
467
0
            .encode(&mut value)
468
0
            .err_tip(|| "Could not encode upstream action result")?;
469
470
0
        let default_len = value.len() - offset;
471
0
        let length = length.unwrap_or(default_len).min(default_len);
472
0
        if length > 0 {
  Branch (472:12): [True: 0, False: 0]
  Branch (472:12): [Folded - Ignored]
473
0
            writer
474
0
                .send(value.freeze().slice(offset..offset + length))
475
0
                .await
476
0
                .err_tip(|| "Failed to write data in grpc store")?;
477
0
        }
478
0
        writer
479
0
            .send_eof()
480
0
            .err_tip(|| "Failed to write EOF in grpc store get_action_result_as_part")?;
481
0
        Ok(())
482
0
    }
483
484
0
    async fn update_action_result_from_bytes(
485
0
        &self,
486
0
        digest: DigestInfo,
487
0
        mut reader: DropCloserReadHalf,
488
0
    ) -> Result<(), Error> {
489
0
        let action_result = ActionResult::decode(reader.consume(None).await?)
490
0
            .err_tip(|| "Failed to decode ActionResult in update_action_result_from_bytes")?;
491
0
        let update_action_request = UpdateActionResultRequest {
492
0
            instance_name: self.instance_name.clone(),
493
0
            action_digest: Some(digest.into()),
494
0
            action_result: Some(action_result),
495
0
            results_cache_policy: None,
496
0
            digest_function: ActiveOriginContext::get_value(&ACTIVE_HASHER_FUNC)
497
0
                .err_tip(|| "In get_action_from_store")?
498
0
                .map_or_else(default_digest_hasher_func, |v| *v)
499
0
                .proto_digest_func()
500
0
                .into(),
501
0
        };
502
0
        self.update_action_result(Request::new(update_action_request))
503
0
            .await
504
0
            .map(|_| ())
505
0
    }
506
}
507
508
#[async_trait]
509
impl StoreDriver for GrpcStore {
510
    // NOTE: This function can only be safely used on CAS stores. AC stores may return a size that
511
    // is incorrect.
512
    async fn has_with_results(
513
        self: Pin<&Self>,
514
        keys: &[StoreKey<'_>],
515
        results: &mut [Option<u64>],
516
0
    ) -> Result<(), Error> {
517
0
        if matches!(self.store_type, nativelink_config::stores::StoreType::ac) {
518
0
            keys.iter()
519
0
                .zip(results.iter_mut())
520
0
                .map(|(key, result)| async move {
521
0
                    // The length of an AC is incorrect, so we don't figure out the
522
0
                    // length, instead the biggest possible result is returned in the
523
0
                    // hope that we detect incorrect usage.
524
0
                    self.get_action_result_from_digest(key.borrow().into_digest())
525
0
                        .await?;
526
0
                    *result = Some(u64::MAX);
527
0
                    Ok::<_, Error>(())
528
0
                })
529
0
                .collect::<FuturesUnordered<_>>()
530
0
                .try_for_each(|_| future::ready(Ok(())))
531
0
                .await
532
0
                .err_tip(|| "Getting upstream action cache entry")?;
533
0
            return Ok(());
534
0
        }
535
536
0
        let missing_blobs_response = self
537
0
            .find_missing_blobs(Request::new(FindMissingBlobsRequest {
538
0
                instance_name: self.instance_name.clone(),
539
0
                blob_digests: keys
540
0
                    .iter()
541
0
                    .map(|k| k.borrow().into_digest().into())
542
0
                    .collect(),
543
0
                digest_function: ActiveOriginContext::get_value(&ACTIVE_HASHER_FUNC)
544
0
                    .err_tip(|| "In GrpcStore::has_with_results")?
545
0
                    .map_or_else(default_digest_hasher_func, |v| *v)
546
0
                    .proto_digest_func()
547
0
                    .into(),
548
0
            }))
549
0
            .await?
550
0
            .into_inner();
551
0
552
0
        // Since the ordering is not guaranteed above, the matching has to check
553
0
        // all missing blobs against all entries in the unsorted digest list.
554
0
        // To optimise this, the missing digests are sorted and then it is
555
0
        // efficient to perform a binary search for each digest within the
556
0
        // missing list.
557
0
        let mut missing_digests =
558
0
            Vec::with_capacity(missing_blobs_response.missing_blob_digests.len());
559
0
        for missing_digest in missing_blobs_response.missing_blob_digests {
560
0
            missing_digests.push(DigestInfo::try_from(missing_digest)?);
561
        }
562
0
        missing_digests.sort_unstable();
563
0
        for (digest, result) in keys
564
0
            .iter()
565
0
            .map(|v| v.borrow().into_digest())
566
0
            .zip(results.iter_mut())
567
        {
568
0
            match missing_digests.binary_search(&digest) {
569
0
                Ok(_) => *result = None,
570
0
                Err(_) => *result = Some(digest.size_bytes()),
571
            }
572
        }
573
574
0
        Ok(())
575
0
    }
576
577
    async fn update(
578
        self: Pin<&Self>,
579
        key: StoreKey<'_>,
580
        reader: DropCloserReadHalf,
581
        _size_info: UploadSizeInfo,
582
0
    ) -> Result<(), Error> {
583
0
        let digest = key.into_digest();
584
0
        if matches!(self.store_type, nativelink_config::stores::StoreType::ac) {
585
0
            return self.update_action_result_from_bytes(digest, reader).await;
586
0
        }
587
0
588
0
        let mut buf = Uuid::encode_buffer();
589
0
        let resource_name = format!(
590
0
            "{}/uploads/{}/blobs/{}/{}",
591
0
            &self.instance_name,
592
0
            Uuid::new_v4().hyphenated().encode_lower(&mut buf),
593
0
            digest.packed_hash(),
594
0
            digest.size_bytes(),
595
0
        );
596
597
        struct LocalState {
598
            resource_name: String,
599
            reader: DropCloserReadHalf,
600
            did_error: bool,
601
            bytes_received: i64,
602
        }
603
0
        let local_state = LocalState {
604
0
            resource_name,
605
0
            reader,
606
0
            did_error: false,
607
0
            bytes_received: 0,
608
0
        };
609
0
610
0
        let stream = Box::pin(unfold(local_state, |mut local_state| async move {
611
0
            if local_state.did_error {
  Branch (611:16): [True: 0, False: 0]
  Branch (611:16): [Folded - Ignored]
612
0
                event!(
613
0
                    Level::ERROR,
614
0
                    "GrpcStore::update() polled stream after error was returned"
615
                );
616
0
                return None;
617
0
            }
618
0
            let data = match local_state
619
0
                .reader
620
0
                .recv()
621
0
                .await
622
0
                .err_tip(|| "In GrpcStore::update()")
623
            {
624
0
                Ok(data) => data,
625
0
                Err(err) => {
626
0
                    local_state.did_error = true;
627
0
                    return Some((Err(err), local_state));
628
                }
629
            };
630
631
0
            let write_offset = local_state.bytes_received;
632
0
            local_state.bytes_received += data.len() as i64;
633
0
634
0
            Some((
635
0
                Ok(WriteRequest {
636
0
                    resource_name: local_state.resource_name.clone(),
637
0
                    write_offset,
638
0
                    finish_write: data.is_empty(), // EOF is when no data was polled.
639
0
                    data,
640
0
                }),
641
0
                local_state,
642
0
            ))
643
0
        }));
644
0
645
0
        self.write(
646
0
            WriteRequestStreamWrapper::from(stream)
647
0
                .await
648
0
                .err_tip(|| "in GrpcStore::update()")?,
649
        )
650
0
        .await
651
0
        .err_tip(|| "in GrpcStore::update()")?;
652
653
0
        Ok(())
654
0
    }
655
656
    async fn get_part(
657
        self: Pin<&Self>,
658
        key: StoreKey<'_>,
659
        writer: &mut DropCloserWriteHalf,
660
        offset: u64,
661
        length: Option<u64>,
662
0
    ) -> Result<(), Error> {
663
0
        let digest = key.into_digest();
664
0
        if matches!(self.store_type, nativelink_config::stores::StoreType::ac) {
665
0
            let offset = usize::try_from(offset).err_tip(|| "Could not convert offset to usize")?;
666
0
            let length = length
667
0
                .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize"))
668
0
                .transpose()?;
669
670
0
            return self
671
0
                .get_action_result_as_part(digest, writer, offset, length)
672
0
                .await;
673
0
        }
674
0
675
0
        // Shortcut for empty blobs.
676
0
        if digest.size_bytes() == 0 {
  Branch (676:12): [True: 0, False: 0]
  Branch (676:12): [Folded - Ignored]
677
0
            return writer.send_eof();
678
0
        }
679
0
680
0
        let resource_name = format!(
681
0
            "{}/blobs/{}/{}",
682
0
            &self.instance_name,
683
0
            digest.packed_hash(),
684
0
            digest.size_bytes(),
685
0
        );
686
687
        struct LocalState<'a> {
688
            resource_name: String,
689
            writer: &'a mut DropCloserWriteHalf,
690
            read_offset: i64,
691
            read_limit: i64,
692
        }
693
694
0
        let local_state = LocalState {
695
0
            resource_name,
696
0
            writer,
697
0
            read_offset: i64::try_from(offset).err_tip(|| "Could not convert offset to i64")?,
698
0
            read_limit: i64::try_from(length.unwrap_or(0))
699
0
                .err_tip(|| "Could not convert length to i64")?,
700
        };
701
702
0
        self.retrier
703
0
            .retry(unfold(local_state, move |mut local_state| async move {
704
0
                let request = ReadRequest {
705
0
                    resource_name: local_state.resource_name.clone(),
706
0
                    read_offset: local_state.read_offset,
707
0
                    read_limit: local_state.read_limit,
708
0
                };
709
0
                let mut stream = match self
710
0
                    .read_internal(request)
711
0
                    .await
712
0
                    .err_tip(|| "in GrpcStore::get_part()")
713
                {
714
0
                    Ok(stream) => stream,
715
0
                    Err(err) => return Some((RetryResult::Retry(err), local_state)),
716
                };
717
718
                loop {
719
0
                    let data = match stream.next().await {
720
                        // Create an empty response to represent EOF.
721
0
                        None => bytes::Bytes::new(),
722
0
                        Some(Ok(message)) => message.data,
723
0
                        Some(Err(status)) => {
724
0
                            return Some((
725
0
                                RetryResult::Retry(
726
0
                                    Into::<Error>::into(status)
727
0
                                        .append("While fetching message in GrpcStore::get_part()"),
728
0
                                ),
729
0
                                local_state,
730
0
                            ))
731
                        }
732
                    };
733
0
                    let length = data.len() as i64;
734
0
                    // This is the usual exit from the loop at EOF.
735
0
                    if length == 0 {
  Branch (735:24): [True: 0, False: 0]
  Branch (735:24): [Folded - Ignored]
736
0
                        let eof_result = local_state
737
0
                            .writer
738
0
                            .send_eof()
739
0
                            .err_tip(|| "Could not send eof in GrpcStore::get_part()")
740
0
                            .map_or_else(RetryResult::Err, RetryResult::Ok);
741
0
                        return Some((eof_result, local_state));
742
0
                    }
743
                    // Forward the data upstream.
744
0
                    if let Err(err) = local_state
  Branch (744:28): [True: 0, False: 0]
  Branch (744:28): [Folded - Ignored]
745
0
                        .writer
746
0
                        .send(data)
747
0
                        .await
748
0
                        .err_tip(|| "While sending in GrpcStore::get_part()")
749
                    {
750
0
                        return Some((RetryResult::Err(err), local_state));
751
0
                    }
752
0
                    local_state.read_offset += length;
753
                }
754
0
            }))
755
0
            .await
756
0
    }
757
758
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
759
0
        self
760
0
    }
761
762
0
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
763
0
        self
764
0
    }
765
766
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
767
0
        self
768
0
    }
769
}
770
771
default_health_status_indicator!(GrpcStore);