Coverage Report

Created: 2025-12-17 22:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/gcs_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::Debug;
16
use core::pin::Pin;
17
use std::borrow::Cow;
18
use std::sync::Arc;
19
20
use async_trait::async_trait;
21
use bytes::Bytes;
22
use futures::stream::{FuturesUnordered, unfold};
23
use futures::{StreamExt, TryStreamExt};
24
use nativelink_config::stores::ExperimentalGcsSpec;
25
use nativelink_error::{Code, Error, ResultExt, make_err};
26
use nativelink_metric::MetricsComponent;
27
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
28
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
29
use nativelink_util::instant_wrapper::InstantWrapper;
30
use nativelink_util::retry::{Retrier, RetryResult};
31
use nativelink_util::store_trait::{
32
    RemoveItemCallback, StoreDriver, StoreKey, StoreOptimizations, UploadSizeInfo,
33
};
34
use rand::Rng;
35
use tokio::time::sleep;
36
37
use crate::cas_utils::is_zero_digest;
38
use crate::gcs_client::client::{GcsClient, GcsOperations};
39
use crate::gcs_client::types::{
40
    CHUNK_SIZE, DEFAULT_CONCURRENT_UPLOADS, DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST,
41
    MIN_MULTIPART_SIZE, ObjectPath,
42
};
43
44
#[derive(MetricsComponent, Debug)]
45
pub struct GcsStore<Client: GcsOperations, NowFn> {
46
    client: Arc<Client>,
47
    now_fn: NowFn,
48
    #[metric(help = "The bucket name for the GCS store")]
49
    bucket: String,
50
    #[metric(help = "The key prefix for the GCS store")]
51
    key_prefix: String,
52
    retrier: Retrier,
53
    #[metric(help = "The number of seconds to consider an object expired")]
54
    consider_expired_after_s: i64,
55
    #[metric(help = "The number of bytes to buffer for retrying requests")]
56
    max_retry_buffer_size: usize,
57
    #[metric(help = "The size of chunks for resumable uploads")]
58
    max_chunk_size: usize,
59
    #[metric(help = "The number of concurrent uploads allowed")]
60
    max_concurrent_uploads: usize,
61
}
62
63
impl<I, NowFn> GcsStore<GcsClient, NowFn>
64
where
65
    I: InstantWrapper,
66
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
67
{
68
0
    pub async fn new(spec: &ExperimentalGcsSpec, now_fn: NowFn) -> Result<Arc<Self>, Error> {
69
0
        let client = Arc::new(GcsClient::new(spec).await?);
70
0
        Self::new_with_ops(spec, client, now_fn)
71
0
    }
72
}
73
74
impl<I, Client, NowFn> GcsStore<Client, NowFn>
75
where
76
    I: InstantWrapper,
77
    Client: GcsOperations + Send + Sync,
78
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
79
{
80
    // Primarily used for injecting a mock or real operations implementation
81
14
    pub fn new_with_ops(
82
14
        spec: &ExperimentalGcsSpec,
83
14
        client: Arc<Client>,
84
14
        now_fn: NowFn,
85
14
    ) -> Result<Arc<Self>, Error> {
86
        // Chunks must be a multiple of 256kb according to the documentation.
87
        const CHUNK_MULTIPLE: usize = 256 * 1024;
88
89
14
        let max_connections = spec
90
14
            .common
91
14
            .multipart_max_concurrent_uploads
92
14
            .unwrap_or(DEFAULT_CONCURRENT_UPLOADS);
93
94
14
        let jitter_amt = spec.common.retry.jitter;
95
14
        let jitter_fn = Arc::new(move |delay: tokio::time::Duration| 
{0
96
0
            if jitter_amt == 0.0 {
  Branch (96:16): [True: 0, False: 0]
  Branch (96:16): [True: 0, False: 0]
  Branch (96:16): [Folded - Ignored]
97
0
                return delay;
98
0
            }
99
0
            delay.mul_f32(jitter_amt.mul_add(rand::rng().random::<f32>() - 0.5, 1.))
100
0
        });
101
102
14
        let max_chunk_size =
103
14
            core::cmp::min(spec.resumable_chunk_size.unwrap_or(CHUNK_SIZE), CHUNK_SIZE);
104
105
14
        let max_chunk_size = if max_chunk_size.is_multiple_of(CHUNK_MULTIPLE) {
  Branch (105:33): [True: 0, False: 0]
  Branch (105:33): [True: 0, False: 14]
  Branch (105:33): [Folded - Ignored]
106
0
            max_chunk_size
107
        } else {
108
14
            ((max_chunk_size + CHUNK_MULTIPLE / 2) / CHUNK_MULTIPLE) * CHUNK_MULTIPLE
109
        };
110
111
14
        let max_retry_buffer_size = spec
112
14
            .common
113
14
            .max_retry_buffer_per_request
114
14
            .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST);
115
116
        // The retry buffer should be at least as big as the chunk size.
117
14
        let max_retry_buffer_size = if max_retry_buffer_size < max_chunk_size {
  Branch (117:40): [True: 0, False: 0]
  Branch (117:40): [True: 0, False: 14]
  Branch (117:40): [Folded - Ignored]
118
0
            max_chunk_size
119
        } else {
120
14
            max_retry_buffer_size
121
        };
122
123
14
        Ok(Arc::new(Self {
124
14
            client,
125
14
            now_fn,
126
14
            bucket: spec.bucket.clone(),
127
14
            key_prefix: spec
128
14
                .common
129
14
                .key_prefix
130
14
                .as_ref()
131
14
                .unwrap_or(&String::new())
132
14
                .clone(),
133
14
            retrier: Retrier::new(
134
14
                Arc::new(|duration| 
Box::pin0
(
sleep0
(
duration0
))),
135
14
                jitter_fn,
136
14
                spec.common.retry.clone(),
137
            ),
138
14
            consider_expired_after_s: i64::from(spec.common.consider_expired_after_s),
139
14
            max_retry_buffer_size,
140
14
            max_chunk_size,
141
14
            max_concurrent_uploads: max_connections,
142
        }))
143
14
    }
144
145
8
    async fn has(self: Pin<&Self>, key: &StoreKey<'_>) -> Result<Option<u64>, Error> {
146
8
        let object_path = self.make_object_path(key);
147
8
        let client = &self.client;
148
8
        let consider_expired_after_s = self.consider_expired_after_s;
149
8
        let now_fn = &self.now_fn;
150
151
8
        self.retrier
152
8
            .retry(unfold(object_path, move |object_path| async move {
153
8
                match client.read_object_metadata(&object_path).await.err_tip(|| 
{1
154
1
                    format!(
155
1
                        "Error while trying to read - bucket: {} path: {}",
156
                        object_path.bucket, object_path.path
157
                    )
158
1
                }) {
159
4
                    Ok(Some(metadata)) => {
160
4
                        if consider_expired_after_s != 0 {
  Branch (160:28): [True: 0, False: 0]
  Branch (160:28): [True: 2, False: 2]
  Branch (160:28): [Folded - Ignored]
161
2
                            if let Some(update_time) = &metadata.update_time {
  Branch (161:36): [True: 0, False: 0]
  Branch (161:36): [True: 2, False: 0]
  Branch (161:36): [Folded - Ignored]
162
2
                                let now_s = now_fn().unix_timestamp() as i64;
163
2
                                if update_time.seconds + consider_expired_after_s <= now_s {
  Branch (163:36): [True: 0, False: 0]
  Branch (163:36): [True: 1, False: 1]
  Branch (163:36): [Folded - Ignored]
164
1
                                    return Some((RetryResult::Ok(None), object_path));
165
1
                                }
166
0
                            }
167
2
                        }
168
169
3
                        if metadata.size >= 0 {
  Branch (169:28): [True: 0, False: 0]
  Branch (169:28): [True: 3, False: 0]
  Branch (169:28): [Folded - Ignored]
170
3
                            Some((RetryResult::Ok(Some(metadata.size as u64)), object_path))
171
                        } else {
172
0
                            Some((
173
0
                                RetryResult::Err(make_err!(
174
0
                                    Code::InvalidArgument,
175
0
                                    "Invalid metadata size in GCS: {}",
176
0
                                    metadata.size
177
0
                                )),
178
0
                                object_path,
179
0
                            ))
180
                        }
181
                    }
182
3
                    Ok(None) => Some((RetryResult::Ok(None), object_path)),
183
1
                    Err(
e0
) if e.code == Code::NotFoun
d0
=> {
  Branch (183:31): [True: 0, False: 0]
  Branch (183:31): [True: 0, False: 1]
  Branch (183:31): [Folded - Ignored]
184
0
                        Some((RetryResult::Ok(None), object_path))
185
                    }
186
1
                    Err(e) => Some((RetryResult::Retry(e), object_path)),
187
                }
188
16
            }))
189
8
            .await
190
8
    }
191
192
12
    fn make_object_path(&self, key: &StoreKey) -> ObjectPath {
193
12
        ObjectPath::new(
194
12
            self.bucket.clone(),
195
12
            &format!("{}{}", self.key_prefix, key.as_str()),
196
        )
197
12
    }
198
}
199
200
#[async_trait]
201
impl<I, Client, NowFn> StoreDriver for GcsStore<Client, NowFn>
202
where
203
    I: InstantWrapper,
204
    Client: GcsOperations + 'static,
205
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
206
{
207
    async fn has_with_results(
208
        self: Pin<&Self>,
209
        keys: &[StoreKey<'_>],
210
        results: &mut [Option<u64>],
211
7
    ) -> Result<(), Error> {
212
        keys.iter()
213
            .zip(results.iter_mut())
214
9
            .map(|(key, result)| async move {
215
9
                if is_zero_digest(key.borrow()) {
  Branch (215:20): [True: 0, False: 0]
  Branch (215:20): [True: 1, False: 8]
  Branch (215:20): [Folded - Ignored]
216
1
                    *result = Some(0);
217
1
                    return Ok(());
218
8
                }
219
8
                *result = self.has(key).await
?1
;
220
7
                Ok(())
221
18
            })
222
            .collect::<FuturesUnordered<_>>()
223
            .try_collect()
224
            .await
225
7
    }
226
227
0
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
228
0
        matches!(optimization, StoreOptimizations::LazyExistenceOnSync)
229
0
    }
230
231
    async fn update(
232
        self: Pin<&Self>,
233
        digest: StoreKey<'_>,
234
        mut reader: DropCloserReadHalf,
235
        upload_size: UploadSizeInfo,
236
4
    ) -> Result<(), Error> {
237
        if is_zero_digest(digest.borrow()) {
238
2
            return reader.recv().await.and_then(|should_be_empty| {
239
2
                should_be_empty
240
2
                    .is_empty()
241
2
                    .then_some(())
242
2
                    .ok_or_else(|| make_err!(
Code::Internal1
, "Zero byte hash not empty"))
243
2
            });
244
        }
245
246
        let object_path = self.make_object_path(&digest);
247
248
        reader.set_max_recent_data_size(
249
            u64::try_from(self.max_retry_buffer_size)
250
                .err_tip(|| "Could not convert max_retry_buffer_size to u64")?,
251
        );
252
253
        // For small files with exact size, we'll use simple upload
254
        if let UploadSizeInfo::ExactSize(size) = upload_size {
255
            if size < MIN_MULTIPART_SIZE {
256
                let content = reader.consume(Some(usize::try_from(size)?)).await?;
257
                let client = &self.client;
258
259
                return self
260
                    .retrier
261
1
                    .retry(unfold(content, |content| async {
262
1
                        match client.write_object(&object_path, content.to_vec()).await {
263
1
                            Ok(()) => Some((RetryResult::Ok(()), content)),
264
0
                            Err(e) => Some((RetryResult::Retry(e), content)),
265
                        }
266
2
                    }))
267
                    .await;
268
            }
269
        }
270
271
        // For larger files, we'll use resumable upload
272
        // Stream and upload data in chunks
273
        let mut offset = 0u64;
274
        let mut total_size = if let UploadSizeInfo::ExactSize(size) = upload_size {
275
            Some(size)
276
        } else {
277
            None
278
        };
279
        let mut upload_id: Option<String> = None;
280
        let client = &self.client;
281
282
        loop {
283
            let chunk = reader.consume(Some(self.max_chunk_size)).await?;
284
            if chunk.is_empty() {
285
                break;
286
            }
287
            // If a full chunk wasn't read, then this is the full length.
288
            if chunk.len() < self.max_chunk_size {
289
                total_size = Some(offset + chunk.len() as u64);
290
            }
291
292
            let upload_id_ref = if let Some(upload_id_ref) = &upload_id {
293
                upload_id_ref
294
            } else {
295
                // Initiate the upload session on the first non-empty chunk.
296
                upload_id = Some(
297
                    self.retrier
298
1
                        .retry(unfold((), |()| async {
299
1
                            match client.start_resumable_write(&object_path).await {
300
1
                                Ok(id) => Some((RetryResult::Ok(id), ())),
301
0
                                Err(e) => Some((
302
0
                                    RetryResult::Retry(make_err!(
303
0
                                        Code::Aborted,
304
0
                                        "Failed to start resumable upload: {:?}",
305
0
                                        e
306
0
                                    )),
307
0
                                    (),
308
0
                                )),
309
                            }
310
2
                        }))
311
                        .await?,
312
                );
313
                upload_id.as_deref().unwrap()
314
            };
315
316
            let current_offset = offset;
317
            offset += chunk.len() as u64;
318
319
            // Uploading the chunk with a retry
320
            let object_path_ref = &object_path;
321
            self.retrier
322
6
                .retry(unfold(chunk, |chunk| async move {
323
6
                    match client
324
6
                        .upload_chunk(
325
6
                            upload_id_ref,
326
6
                            object_path_ref,
327
6
                            chunk.clone(),
328
6
                            current_offset,
329
6
                            offset,
330
6
                            total_size,
331
6
                        )
332
6
                        .await
333
                    {
334
6
                        Ok(()) => Some((RetryResult::Ok(()), chunk)),
335
0
                        Err(e) => Some((RetryResult::Retry(e), chunk)),
336
                    }
337
12
                }))
338
                .await?;
339
        }
340
341
        // Handle the case that the stream was of unknown length and
342
        // happened to be an exact multiple of chunk size.
343
        if let Some(upload_id_ref) = &upload_id {
344
            if total_size.is_none() {
345
                let object_path_ref = &object_path;
346
                self.retrier
347
0
                    .retry(unfold((), |()| async move {
348
0
                        match client
349
0
                            .upload_chunk(
350
0
                                upload_id_ref,
351
0
                                object_path_ref,
352
0
                                Bytes::new(),
353
0
                                offset,
354
0
                                offset,
355
0
                                Some(offset),
356
0
                            )
357
0
                            .await
358
                        {
359
0
                            Ok(()) => Some((RetryResult::Ok(()), ())),
360
0
                            Err(e) => Some((RetryResult::Retry(e), ())),
361
                        }
362
0
                    }))
363
                    .await?;
364
            }
365
        } else {
366
            // Handle streamed empty file.
367
            return self
368
                .retrier
369
0
                .retry(unfold((), |()| async {
370
0
                    match client.write_object(&object_path, Vec::new()).await {
371
0
                        Ok(()) => Some((RetryResult::Ok(()), ())),
372
0
                        Err(e) => Some((RetryResult::Retry(e), ())),
373
                    }
374
0
                }))
375
                .await;
376
        }
377
378
        // Verifying if the upload was successful
379
        self.retrier
380
1
            .retry(unfold((), |()| async {
381
1
                match client.object_exists(&object_path).await {
382
1
                    Ok(true) => Some((RetryResult::Ok(()), ())),
383
0
                    Ok(false) => Some((
384
0
                        RetryResult::Retry(make_err!(
385
0
                            Code::Internal,
386
0
                            "Object not found after upload completion"
387
0
                        )),
388
0
                        (),
389
0
                    )),
390
0
                    Err(e) => Some((RetryResult::Retry(e), ())),
391
                }
392
2
            }))
393
            .await?;
394
395
        Ok(())
396
4
    }
397
398
    async fn get_part(
399
        self: Pin<&Self>,
400
        key: StoreKey<'_>,
401
        writer: &mut DropCloserWriteHalf,
402
        offset: u64,
403
        length: Option<u64>,
404
3
    ) -> Result<(), Error> {
405
        if is_zero_digest(key.borrow()) {
406
            writer.send_eof()?;
407
            return Ok(());
408
        }
409
410
        let object_path = self.make_object_path(&key);
411
1
        let end_offset = length.map(|len| offset + len);
412
        let client = &self.client;
413
414
        let object_path_ref = &object_path;
415
        self.retrier
416
            .retry(unfold(
417
                (offset, writer),
418
2
                |(mut offset, writer)| async move {
419
2
                    let mut stream = match client
420
2
                        .read_object_content(object_path_ref, offset, end_offset)
421
2
                        .await
422
                    {
423
2
                        Ok(stream) => stream,
424
0
                        Err(e) if e.code == Code::NotFound => {
  Branch (424:35): [True: 0, False: 0]
  Branch (424:35): [True: 0, False: 0]
  Branch (424:35): [Folded - Ignored]
425
0
                            return Some((RetryResult::Err(e), (offset, writer)));
426
                        }
427
0
                        Err(e) => return Some((RetryResult::Retry(e), (offset, writer))),
428
                    };
429
430
4
                    while let Some(
next_chunk2
) = stream.next().await {
  Branch (430:31): [True: 0, False: 0]
  Branch (430:31): [True: 2, False: 2]
  Branch (430:31): [Folded - Ignored]
431
2
                        match next_chunk {
432
2
                            Ok(bytes) => {
433
2
                                offset += bytes.len() as u64;
434
2
                                if let Err(
err0
) = writer.send(bytes).await {
  Branch (434:40): [True: 0, False: 0]
  Branch (434:40): [True: 0, False: 2]
  Branch (434:40): [Folded - Ignored]
435
0
                                    return Some((RetryResult::Err(err), (offset, writer)));
436
2
                                }
437
                            }
438
0
                            Err(err) => return Some((RetryResult::Retry(err), (offset, writer))),
439
                        }
440
                    }
441
442
2
                    if let Err(
err0
) = writer.send_eof() {
  Branch (442:28): [True: 0, False: 0]
  Branch (442:28): [True: 0, False: 2]
  Branch (442:28): [Folded - Ignored]
443
0
                        return Some((RetryResult::Err(err), (offset, writer)));
444
2
                    }
445
446
2
                    Some((RetryResult::Ok(()), (offset, writer)))
447
4
                },
448
            ))
449
            .await
450
3
    }
451
452
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &'_ dyn StoreDriver {
453
0
        self
454
0
    }
455
456
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
457
0
        self
458
0
    }
459
460
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
461
0
        self
462
0
    }
463
464
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
465
0
        registry.register_indicator(self);
466
0
    }
467
468
0
    fn register_remove_callback(
469
0
        self: Arc<Self>,
470
0
        _callback: Arc<dyn RemoveItemCallback>,
471
0
    ) -> Result<(), Error> {
472
        // As we're backed by GCS, this store doesn't actually drop stuff
473
        // so we can actually just ignore this
474
0
        Ok(())
475
0
    }
476
}
477
478
#[async_trait]
479
impl<I, Client, NowFn> HealthStatusIndicator for GcsStore<Client, NowFn>
480
where
481
    I: InstantWrapper,
482
    Client: GcsOperations + 'static,
483
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
484
{
485
0
    fn get_name(&self) -> &'static str {
486
0
        "GcsStore"
487
0
    }
488
489
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
490
        StoreDriver::check_health(Pin::new(self), namespace).await
491
0
    }
492
}