Coverage Report

Created: 2025-07-10 19:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/gcs_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::Debug;
16
use core::pin::Pin;
17
use std::borrow::Cow;
18
use std::sync::Arc;
19
20
use async_trait::async_trait;
21
use bytes::Bytes;
22
use futures::stream::{FuturesUnordered, unfold};
23
use futures::{StreamExt, TryStreamExt};
24
use nativelink_config::stores::ExperimentalGcsSpec;
25
use nativelink_error::{Code, Error, ResultExt, make_err};
26
use nativelink_metric::MetricsComponent;
27
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
28
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
29
use nativelink_util::instant_wrapper::InstantWrapper;
30
use nativelink_util::retry::{Retrier, RetryResult};
31
use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo};
32
use rand::Rng;
33
use tokio::time::sleep;
34
35
use crate::cas_utils::is_zero_digest;
36
use crate::gcs_client::client::{GcsClient, GcsOperations};
37
use crate::gcs_client::types::{
38
    CHUNK_SIZE, DEFAULT_CONCURRENT_UPLOADS, DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST,
39
    MIN_MULTIPART_SIZE, ObjectPath,
40
};
41
42
#[derive(MetricsComponent, Debug)]
43
pub struct GcsStore<NowFn> {
44
    client: Arc<dyn GcsOperations>,
45
    now_fn: NowFn,
46
    #[metric(help = "The bucket name for the GCS store")]
47
    bucket: String,
48
    #[metric(help = "The key prefix for the GCS store")]
49
    key_prefix: String,
50
    retrier: Retrier,
51
    #[metric(help = "The number of seconds to consider an object expired")]
52
    consider_expired_after_s: i64,
53
    #[metric(help = "The number of bytes to buffer for retrying requests")]
54
    max_retry_buffer_size: usize,
55
    #[metric(help = "The size of chunks for resumable uploads")]
56
    max_chunk_size: usize,
57
    #[metric(help = "The number of concurrent uploads allowed")]
58
    max_concurrent_uploads: usize,
59
}
60
61
impl<I, NowFn> GcsStore<NowFn>
62
where
63
    I: InstantWrapper,
64
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
65
{
66
0
    pub async fn new(spec: &ExperimentalGcsSpec, now_fn: NowFn) -> Result<Arc<Self>, Error> {
67
0
        let client = GcsClient::new(spec).await?;
68
0
        let client: Arc<dyn GcsOperations> = Arc::new(client);
69
70
0
        Self::new_with_ops(spec, client, now_fn)
71
0
    }
72
73
    // Primarily used for injecting a mock or real operations implementation
74
12
    pub fn new_with_ops(
75
12
        spec: &ExperimentalGcsSpec,
76
12
        client: Arc<dyn GcsOperations>,
77
12
        now_fn: NowFn,
78
12
    ) -> Result<Arc<Self>, Error> {
79
        // Chunks must be a multiple of 256kb according to the documentation.
80
        const CHUNK_MULTIPLE: usize = 256 * 1024;
81
82
12
        let max_connections = spec
83
12
            .common
84
12
            .multipart_max_concurrent_uploads
85
12
            .unwrap_or(DEFAULT_CONCURRENT_UPLOADS);
86
87
12
        let jitter_amt = spec.common.retry.jitter;
88
12
        let jitter_fn = Arc::new(move |delay: tokio::time::Duration| 
{0
89
0
            if jitter_amt == 0.0 {
  Branch (89:16): [True: 0, False: 0]
  Branch (89:16): [True: 0, False: 0]
  Branch (89:16): [Folded - Ignored]
90
0
                return delay;
91
0
            }
92
0
            delay.mul_f32(jitter_amt.mul_add(rand::rng().random::<f32>() - 0.5, 1.))
93
0
        });
94
95
12
        let max_chunk_size =
96
12
            core::cmp::min(spec.resumable_chunk_size.unwrap_or(CHUNK_SIZE), CHUNK_SIZE);
97
98
12
        let max_chunk_size = if max_chunk_size % CHUNK_MULTIPLE != 0 {
  Branch (98:33): [True: 0, False: 0]
  Branch (98:33): [True: 12, False: 0]
  Branch (98:33): [Folded - Ignored]
99
12
            ((max_chunk_size + CHUNK_MULTIPLE / 2) / CHUNK_MULTIPLE) * CHUNK_MULTIPLE
100
        } else {
101
0
            max_chunk_size
102
        };
103
104
12
        let max_retry_buffer_size = spec
105
12
            .common
106
12
            .max_retry_buffer_per_request
107
12
            .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST);
108
109
        // The retry buffer should be at least as big as the chunk size.
110
12
        let max_retry_buffer_size = if max_retry_buffer_size < max_chunk_size {
  Branch (110:40): [True: 0, False: 0]
  Branch (110:40): [True: 0, False: 12]
  Branch (110:40): [Folded - Ignored]
111
0
            max_chunk_size
112
        } else {
113
12
            max_retry_buffer_size
114
        };
115
116
12
        Ok(Arc::new(Self {
117
12
            client,
118
12
            now_fn,
119
12
            bucket: spec.bucket.clone(),
120
12
            key_prefix: spec
121
12
                .common
122
12
                .key_prefix
123
12
                .as_ref()
124
12
                .unwrap_or(&String::new())
125
12
                .clone(),
126
12
            retrier: Retrier::new(
127
12
                Arc::new(|duration| 
Box::pin0
(
sleep0
(
duration0
))),
128
12
                jitter_fn,
129
12
                spec.common.retry.clone(),
130
            ),
131
12
            consider_expired_after_s: i64::from(spec.common.consider_expired_after_s),
132
12
            max_retry_buffer_size,
133
12
            max_chunk_size,
134
12
            max_concurrent_uploads: max_connections,
135
        }))
136
12
    }
137
138
8
    async fn has(self: Pin<&Self>, key: &StoreKey<'_>) -> Result<Option<u64>, Error> {
139
8
        let object_path = self.make_object_path(key);
140
8
        let client = &self.client;
141
8
        let consider_expired_after_s = self.consider_expired_after_s;
142
8
        let now_fn = &self.now_fn;
143
144
8
        self.retrier
145
8
            .retry(unfold(object_path, move |object_path| async move {
146
8
                match client.read_object_metadata(&object_path).await.err_tip(|| 
{1
147
1
                    format!(
148
1
                        "Error while trying to read - bucket: {} path: {}",
149
                        object_path.bucket, object_path.path
150
                    )
151
1
                }) {
152
4
                    Ok(Some(metadata)) => {
153
4
                        if consider_expired_after_s != 0 {
  Branch (153:28): [True: 0, False: 0]
  Branch (153:28): [True: 2, False: 2]
  Branch (153:28): [Folded - Ignored]
154
2
                            if let Some(update_time) = &metadata.update_time {
  Branch (154:36): [True: 0, False: 0]
  Branch (154:36): [True: 2, False: 0]
  Branch (154:36): [Folded - Ignored]
155
2
                                let now_s = now_fn().unix_timestamp() as i64;
156
2
                                if update_time.seconds + consider_expired_after_s <= now_s {
  Branch (156:36): [True: 0, False: 0]
  Branch (156:36): [True: 1, False: 1]
  Branch (156:36): [Folded - Ignored]
157
1
                                    return Some((RetryResult::Ok(None), object_path));
158
1
                                }
159
0
                            }
160
2
                        }
161
162
3
                        if metadata.size >= 0 {
  Branch (162:28): [True: 0, False: 0]
  Branch (162:28): [True: 3, False: 0]
  Branch (162:28): [Folded - Ignored]
163
3
                            Some((RetryResult::Ok(Some(metadata.size as u64)), object_path))
164
                        } else {
165
0
                            Some((
166
0
                                RetryResult::Err(make_err!(
167
0
                                    Code::InvalidArgument,
168
0
                                    "Invalid metadata size in GCS: {}",
169
0
                                    metadata.size
170
0
                                )),
171
0
                                object_path,
172
0
                            ))
173
                        }
174
                    }
175
3
                    Ok(None) => Some((RetryResult::Ok(None), object_path)),
176
1
                    Err(
e0
) if e.code == Code::NotFoun
d0
=> {
  Branch (176:31): [True: 0, False: 0]
  Branch (176:31): [True: 0, False: 1]
  Branch (176:31): [Folded - Ignored]
177
0
                        Some((RetryResult::Ok(None), object_path))
178
                    }
179
1
                    Err(e) => Some((RetryResult::Retry(e), object_path)),
180
                }
181
16
            }))
182
8
            .await
183
8
    }
184
185
12
    fn make_object_path(&self, key: &StoreKey) -> ObjectPath {
186
12
        ObjectPath::new(
187
12
            self.bucket.clone(),
188
12
            &format!("{}{}", self.key_prefix, key.as_str()),
189
        )
190
12
    }
191
}
192
193
#[async_trait]
194
impl<I, NowFn> StoreDriver for GcsStore<NowFn>
195
where
196
    I: InstantWrapper,
197
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
198
{
199
    async fn has_with_results(
200
        self: Pin<&Self>,
201
        keys: &[StoreKey<'_>],
202
        results: &mut [Option<u64>],
203
14
    ) -> Result<(), Error> {
204
7
        keys.iter()
205
7
            .zip(results.iter_mut())
206
9
            .
map7
(|(key, result)| async move {
207
9
                if is_zero_digest(key.borrow()) {
  Branch (207:20): [True: 0, False: 0]
  Branch (207:20): [True: 1, False: 8]
  Branch (207:20): [Folded - Ignored]
208
1
                    *result = Some(0);
209
1
                    return Ok(());
210
8
                }
211
8
                *result = self.has(key).await
?1
;
212
7
                Ok(())
213
18
            })
214
7
            .collect::<FuturesUnordered<_>>()
215
7
            .try_collect()
216
7
            .await
217
14
    }
218
219
    async fn update(
220
        self: Pin<&Self>,
221
        digest: StoreKey<'_>,
222
        mut reader: DropCloserReadHalf,
223
        upload_size: UploadSizeInfo,
224
4
    ) -> Result<(), Error> {
225
2
        let object_path = self.make_object_path(&digest);
226
227
2
        reader.set_max_recent_data_size(
228
2
            u64::try_from(self.max_retry_buffer_size)
229
2
                .err_tip(|| "Could not convert max_retry_buffer_size to u64")
?0
,
230
        );
231
232
        // For small files with exact size, we'll use simple upload
233
2
        if let UploadSizeInfo::ExactSize(size) = upload_size {
  Branch (233:16): [True: 0, False: 0]
  Branch (233:16): [True: 2, False: 0]
  Branch (233:16): [Folded - Ignored]
234
2
            if size < MIN_MULTIPART_SIZE {
  Branch (234:16): [True: 0, False: 0]
  Branch (234:16): [True: 1, False: 1]
  Branch (234:16): [Folded - Ignored]
235
1
                let content = reader.consume(Some(size as usize)).await
?0
;
236
1
                let client = &self.client;
237
238
1
                return self
239
1
                    .retrier
240
1
                    .retry(unfold(content, |content| async {
241
1
                        match client.write_object(&object_path, content.to_vec()).await {
242
1
                            Ok(()) => Some((RetryResult::Ok(()), content)),
243
0
                            Err(e) => Some((RetryResult::Retry(e), content)),
244
                        }
245
2
                    }))
246
1
                    .await;
247
1
            }
248
0
        }
249
250
        // For larger files, we'll use resumable upload
251
        // Stream and upload data in chunks
252
1
        let mut offset = 0u64;
253
1
        let mut total_size = if let UploadSizeInfo::ExactSize(size) = upload_size {
  Branch (253:37): [True: 0, False: 0]
  Branch (253:37): [True: 1, False: 0]
  Branch (253:37): [Folded - Ignored]
254
1
            Some(size)
255
        } else {
256
0
            None
257
        };
258
1
        let mut upload_id: Option<String> = None;
259
1
        let client = &self.client;
260
261
        loop {
262
7
            let chunk = reader.consume(Some(self.max_chunk_size)).await
?0
;
263
7
            if chunk.is_empty() {
  Branch (263:16): [True: 0, False: 0]
  Branch (263:16): [True: 1, False: 6]
  Branch (263:16): [Folded - Ignored]
264
1
                break;
265
6
            }
266
            // If a full chunk wasn't read, then this is the full length.
267
6
            if chunk.len() < self.max_chunk_size {
  Branch (267:16): [True: 0, False: 0]
  Branch (267:16): [True: 1, False: 5]
  Branch (267:16): [Folded - Ignored]
268
1
                total_size = Some(offset + chunk.len() as u64);
269
5
            }
270
271
6
            let upload_id_ref = if let Some(
upload_id_ref5
) = &upload_id {
  Branch (271:40): [True: 0, False: 0]
  Branch (271:40): [True: 5, False: 1]
  Branch (271:40): [Folded - Ignored]
272
5
                upload_id_ref
273
            } else {
274
                // Initiate the upload session on the first non-empty chunk.
275
1
                upload_id = Some(
276
1
                    self.retrier
277
1
                        .retry(unfold((), |()| async {
278
1
                            match client.start_resumable_write(&object_path).await {
279
1
                                Ok(id) => Some((RetryResult::Ok(id), ())),
280
0
                                Err(e) => Some((
281
0
                                    RetryResult::Retry(make_err!(
282
0
                                        Code::Aborted,
283
0
                                        "Failed to start resumable upload: {:?}",
284
0
                                        e
285
0
                                    )),
286
0
                                    (),
287
0
                                )),
288
                            }
289
2
                        }))
290
1
                        .await
?0
,
291
                );
292
1
                upload_id.as_deref().unwrap()
293
            };
294
295
6
            let current_offset = offset;
296
6
            offset += chunk.len() as u64;
297
298
            // Uploading the chunk with a retry
299
6
            let object_path_ref = &object_path;
300
6
            self.retrier
301
6
                .retry(unfold(chunk, |chunk| async move {
302
6
                    match client
303
6
                        .upload_chunk(
304
6
                            upload_id_ref,
305
6
                            object_path_ref,
306
6
                            chunk.clone(),
307
6
                            current_offset,
308
6
                            offset,
309
6
                            total_size,
310
6
                        )
311
6
                        .await
312
                    {
313
6
                        Ok(()) => Some((RetryResult::Ok(()), chunk)),
314
0
                        Err(e) => Some((RetryResult::Retry(e), chunk)),
315
                    }
316
12
                }))
317
6
                .await
?0
;
318
        }
319
320
        // Handle the case that the stream was of unknown length and
321
        // happened to be an exact multiple of chunk size.
322
1
        if let Some(upload_id_ref) = &upload_id {
  Branch (322:16): [True: 0, False: 0]
  Branch (322:16): [True: 1, False: 0]
  Branch (322:16): [Folded - Ignored]
323
1
            if total_size.is_none() {
  Branch (323:16): [True: 0, False: 0]
  Branch (323:16): [True: 0, False: 1]
  Branch (323:16): [Folded - Ignored]
324
0
                let object_path_ref = &object_path;
325
0
                self.retrier
326
0
                    .retry(unfold((), |()| async move {
327
0
                        match client
328
0
                            .upload_chunk(
329
0
                                upload_id_ref,
330
0
                                object_path_ref,
331
0
                                Bytes::new(),
332
0
                                offset,
333
0
                                offset,
334
0
                                Some(offset),
335
0
                            )
336
0
                            .await
337
                        {
338
0
                            Ok(()) => Some((RetryResult::Ok(()), ())),
339
0
                            Err(e) => Some((RetryResult::Retry(e), ())),
340
                        }
341
0
                    }))
342
0
                    .await?;
343
1
            }
344
        } else {
345
            // Handle streamed empty file.
346
0
            return self
347
0
                .retrier
348
0
                .retry(unfold((), |()| async {
349
0
                    match client.write_object(&object_path, Vec::new()).await {
350
0
                        Ok(()) => Some((RetryResult::Ok(()), ())),
351
0
                        Err(e) => Some((RetryResult::Retry(e), ())),
352
                    }
353
0
                }))
354
0
                .await;
355
        }
356
357
        // Verifying if the upload was successful
358
1
        self.retrier
359
1
            .retry(unfold((), |()| async {
360
1
                match client.object_exists(&object_path).await {
361
1
                    Ok(true) => Some((RetryResult::Ok(()), ())),
362
0
                    Ok(false) => Some((
363
0
                        RetryResult::Retry(make_err!(
364
0
                            Code::Internal,
365
0
                            "Object not found after upload completion"
366
0
                        )),
367
0
                        (),
368
0
                    )),
369
0
                    Err(e) => Some((RetryResult::Retry(e), ())),
370
                }
371
2
            }))
372
1
            .await
?0
;
373
374
1
        Ok(())
375
4
    }
376
377
    async fn get_part(
378
        self: Pin<&Self>,
379
        key: StoreKey<'_>,
380
        writer: &mut DropCloserWriteHalf,
381
        offset: u64,
382
        length: Option<u64>,
383
6
    ) -> Result<(), Error> {
384
3
        if is_zero_digest(key.borrow()) {
  Branch (384:12): [True: 0, False: 0]
  Branch (384:12): [True: 1, False: 2]
  Branch (384:12): [Folded - Ignored]
385
1
            writer.send_eof()
?0
;
386
1
            return Ok(());
387
2
        }
388
389
2
        let object_path = self.make_object_path(&key);
390
2
        let end_offset = length.map(|len| 
offset1
+
len1
);
391
2
        let client = &self.client;
392
393
2
        let object_path_ref = &object_path;
394
2
        self.retrier
395
2
            .retry(unfold(
396
2
                (offset, writer),
397
2
                |(mut offset, writer)| async move {
398
2
                    let mut stream = match client
399
2
                        .read_object_content(object_path_ref, offset, end_offset)
400
2
                        .await
401
                    {
402
2
                        Ok(stream) => stream,
403
0
                        Err(e) if e.code == Code::NotFound => {
  Branch (403:35): [True: 0, False: 0]
  Branch (403:35): [True: 0, False: 0]
  Branch (403:35): [Folded - Ignored]
404
0
                            return Some((RetryResult::Err(e), (offset, writer)));
405
                        }
406
0
                        Err(e) => return Some((RetryResult::Retry(e), (offset, writer))),
407
                    };
408
409
4
                    while let Some(
next_chunk2
) = stream.next().await {
  Branch (409:31): [True: 0, False: 0]
  Branch (409:31): [True: 2, False: 2]
  Branch (409:31): [Folded - Ignored]
410
2
                        match next_chunk {
411
2
                            Ok(bytes) => {
412
2
                                offset += bytes.len() as u64;
413
2
                                if let Err(
err0
) = writer.send(bytes).await {
  Branch (413:40): [True: 0, False: 0]
  Branch (413:40): [True: 0, False: 2]
  Branch (413:40): [Folded - Ignored]
414
0
                                    return Some((RetryResult::Err(err), (offset, writer)));
415
2
                                }
416
                            }
417
0
                            Err(err) => return Some((RetryResult::Retry(err), (offset, writer))),
418
                        }
419
                    }
420
421
2
                    if let Err(
err0
) = writer.send_eof() {
  Branch (421:28): [True: 0, False: 0]
  Branch (421:28): [True: 0, False: 2]
  Branch (421:28): [Folded - Ignored]
422
0
                        return Some((RetryResult::Err(err), (offset, writer)));
423
2
                    }
424
425
2
                    Some((RetryResult::Ok(()), (offset, writer)))
426
4
                },
427
            ))
428
2
            .await
429
6
    }
430
431
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &'_ dyn StoreDriver {
432
0
        self
433
0
    }
434
435
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
436
0
        self
437
0
    }
438
439
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
440
0
        self
441
0
    }
442
443
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
444
0
        registry.register_indicator(self);
445
0
    }
446
}
447
448
#[async_trait]
449
impl<I, NowFn> HealthStatusIndicator for GcsStore<NowFn>
450
where
451
    I: InstantWrapper,
452
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
453
{
454
0
    fn get_name(&self) -> &'static str {
455
0
        "GcsStore"
456
0
    }
457
458
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
459
0
        StoreDriver::check_health(Pin::new(self), namespace).await
460
0
    }
461
}