Coverage Report

Created: 2025-10-18 23:40

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/gcs_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::Debug;
16
use core::pin::Pin;
17
use std::borrow::Cow;
18
use std::sync::Arc;
19
20
use async_trait::async_trait;
21
use bytes::Bytes;
22
use futures::stream::{FuturesUnordered, unfold};
23
use futures::{StreamExt, TryStreamExt};
24
use nativelink_config::stores::ExperimentalGcsSpec;
25
use nativelink_error::{Code, Error, ResultExt, make_err};
26
use nativelink_metric::MetricsComponent;
27
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
28
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
29
use nativelink_util::instant_wrapper::InstantWrapper;
30
use nativelink_util::retry::{Retrier, RetryResult};
31
use nativelink_util::store_trait::{RemoveItemCallback, StoreDriver, StoreKey, UploadSizeInfo};
32
use rand::Rng;
33
use tokio::time::sleep;
34
35
use crate::cas_utils::is_zero_digest;
36
use crate::gcs_client::client::{GcsClient, GcsOperations};
37
use crate::gcs_client::types::{
38
    CHUNK_SIZE, DEFAULT_CONCURRENT_UPLOADS, DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST,
39
    MIN_MULTIPART_SIZE, ObjectPath,
40
};
41
42
#[derive(MetricsComponent, Debug)]
43
pub struct GcsStore<Client: GcsOperations, NowFn> {
44
    client: Arc<Client>,
45
    now_fn: NowFn,
46
    #[metric(help = "The bucket name for the GCS store")]
47
    bucket: String,
48
    #[metric(help = "The key prefix for the GCS store")]
49
    key_prefix: String,
50
    retrier: Retrier,
51
    #[metric(help = "The number of seconds to consider an object expired")]
52
    consider_expired_after_s: i64,
53
    #[metric(help = "The number of bytes to buffer for retrying requests")]
54
    max_retry_buffer_size: usize,
55
    #[metric(help = "The size of chunks for resumable uploads")]
56
    max_chunk_size: usize,
57
    #[metric(help = "The number of concurrent uploads allowed")]
58
    max_concurrent_uploads: usize,
59
}
60
61
impl<I, NowFn> GcsStore<GcsClient, NowFn>
62
where
63
    I: InstantWrapper,
64
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
65
{
66
0
    pub async fn new(spec: &ExperimentalGcsSpec, now_fn: NowFn) -> Result<Arc<Self>, Error> {
67
0
        let client = Arc::new(GcsClient::new(spec).await?);
68
0
        Self::new_with_ops(spec, client, now_fn)
69
0
    }
70
}
71
72
impl<I, Client, NowFn> GcsStore<Client, NowFn>
73
where
74
    I: InstantWrapper,
75
    Client: GcsOperations + Send + Sync,
76
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
77
{
78
    // Primarily used for injecting a mock or real operations implementation
79
12
    pub fn new_with_ops(
80
12
        spec: &ExperimentalGcsSpec,
81
12
        client: Arc<Client>,
82
12
        now_fn: NowFn,
83
12
    ) -> Result<Arc<Self>, Error> {
84
        // Chunks must be a multiple of 256kb according to the documentation.
85
        const CHUNK_MULTIPLE: usize = 256 * 1024;
86
87
12
        let max_connections = spec
88
12
            .common
89
12
            .multipart_max_concurrent_uploads
90
12
            .unwrap_or(DEFAULT_CONCURRENT_UPLOADS);
91
92
12
        let jitter_amt = spec.common.retry.jitter;
93
12
        let jitter_fn = Arc::new(move |delay: tokio::time::Duration| 
{0
94
0
            if jitter_amt == 0.0 {
  Branch (94:16): [True: 0, False: 0]
  Branch (94:16): [True: 0, False: 0]
  Branch (94:16): [Folded - Ignored]
95
0
                return delay;
96
0
            }
97
0
            delay.mul_f32(jitter_amt.mul_add(rand::rng().random::<f32>() - 0.5, 1.))
98
0
        });
99
100
12
        let max_chunk_size =
101
12
            core::cmp::min(spec.resumable_chunk_size.unwrap_or(CHUNK_SIZE), CHUNK_SIZE);
102
103
12
        let max_chunk_size = if max_chunk_size.is_multiple_of(CHUNK_MULTIPLE) {
  Branch (103:33): [True: 0, False: 0]
  Branch (103:33): [True: 0, False: 12]
  Branch (103:33): [Folded - Ignored]
104
0
            max_chunk_size
105
        } else {
106
12
            ((max_chunk_size + CHUNK_MULTIPLE / 2) / CHUNK_MULTIPLE) * CHUNK_MULTIPLE
107
        };
108
109
12
        let max_retry_buffer_size = spec
110
12
            .common
111
12
            .max_retry_buffer_per_request
112
12
            .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST);
113
114
        // The retry buffer should be at least as big as the chunk size.
115
12
        let max_retry_buffer_size = if max_retry_buffer_size < max_chunk_size {
  Branch (115:40): [True: 0, False: 0]
  Branch (115:40): [True: 0, False: 12]
  Branch (115:40): [Folded - Ignored]
116
0
            max_chunk_size
117
        } else {
118
12
            max_retry_buffer_size
119
        };
120
121
12
        Ok(Arc::new(Self {
122
12
            client,
123
12
            now_fn,
124
12
            bucket: spec.bucket.clone(),
125
12
            key_prefix: spec
126
12
                .common
127
12
                .key_prefix
128
12
                .as_ref()
129
12
                .unwrap_or(&String::new())
130
12
                .clone(),
131
12
            retrier: Retrier::new(
132
12
                Arc::new(|duration| 
Box::pin0
(
sleep0
(
duration0
))),
133
12
                jitter_fn,
134
12
                spec.common.retry.clone(),
135
            ),
136
12
            consider_expired_after_s: i64::from(spec.common.consider_expired_after_s),
137
12
            max_retry_buffer_size,
138
12
            max_chunk_size,
139
12
            max_concurrent_uploads: max_connections,
140
        }))
141
12
    }
142
143
8
    async fn has(self: Pin<&Self>, key: &StoreKey<'_>) -> Result<Option<u64>, Error> {
144
8
        let object_path = self.make_object_path(key);
145
8
        let client = &self.client;
146
8
        let consider_expired_after_s = self.consider_expired_after_s;
147
8
        let now_fn = &self.now_fn;
148
149
8
        self.retrier
150
8
            .retry(unfold(object_path, move |object_path| async move {
151
8
                match client.read_object_metadata(&object_path).await.err_tip(|| 
{1
152
1
                    format!(
153
1
                        "Error while trying to read - bucket: {} path: {}",
154
                        object_path.bucket, object_path.path
155
                    )
156
1
                }) {
157
4
                    Ok(Some(metadata)) => {
158
4
                        if consider_expired_after_s != 0 {
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 2, False: 2]
  Branch (158:28): [Folded - Ignored]
159
2
                            if let Some(update_time) = &metadata.update_time {
  Branch (159:36): [True: 0, False: 0]
  Branch (159:36): [True: 2, False: 0]
  Branch (159:36): [Folded - Ignored]
160
2
                                let now_s = now_fn().unix_timestamp() as i64;
161
2
                                if update_time.seconds + consider_expired_after_s <= now_s {
  Branch (161:36): [True: 0, False: 0]
  Branch (161:36): [True: 1, False: 1]
  Branch (161:36): [Folded - Ignored]
162
1
                                    return Some((RetryResult::Ok(None), object_path));
163
1
                                }
164
0
                            }
165
2
                        }
166
167
3
                        if metadata.size >= 0 {
  Branch (167:28): [True: 0, False: 0]
  Branch (167:28): [True: 3, False: 0]
  Branch (167:28): [Folded - Ignored]
168
3
                            Some((RetryResult::Ok(Some(metadata.size as u64)), object_path))
169
                        } else {
170
0
                            Some((
171
0
                                RetryResult::Err(make_err!(
172
0
                                    Code::InvalidArgument,
173
0
                                    "Invalid metadata size in GCS: {}",
174
0
                                    metadata.size
175
0
                                )),
176
0
                                object_path,
177
0
                            ))
178
                        }
179
                    }
180
3
                    Ok(None) => Some((RetryResult::Ok(None), object_path)),
181
1
                    Err(
e0
) if e.code == Code::NotFoun
d0
=> {
  Branch (181:31): [True: 0, False: 0]
  Branch (181:31): [True: 0, False: 1]
  Branch (181:31): [Folded - Ignored]
182
0
                        Some((RetryResult::Ok(None), object_path))
183
                    }
184
1
                    Err(e) => Some((RetryResult::Retry(e), object_path)),
185
                }
186
16
            }))
187
8
            .await
188
8
    }
189
190
12
    fn make_object_path(&self, key: &StoreKey) -> ObjectPath {
191
12
        ObjectPath::new(
192
12
            self.bucket.clone(),
193
12
            &format!("{}{}", self.key_prefix, key.as_str()),
194
        )
195
12
    }
196
}
197
198
#[async_trait]
199
impl<I, Client, NowFn> StoreDriver for GcsStore<Client, NowFn>
200
where
201
    I: InstantWrapper,
202
    Client: GcsOperations + 'static,
203
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
204
{
205
    async fn has_with_results(
206
        self: Pin<&Self>,
207
        keys: &[StoreKey<'_>],
208
        results: &mut [Option<u64>],
209
7
    ) -> Result<(), Error> {
210
        keys.iter()
211
            .zip(results.iter_mut())
212
9
            .map(|(key, result)| async move {
213
9
                if is_zero_digest(key.borrow()) {
  Branch (213:20): [True: 0, False: 0]
  Branch (213:20): [True: 1, False: 8]
  Branch (213:20): [Folded - Ignored]
214
1
                    *result = Some(0);
215
1
                    return Ok(());
216
8
                }
217
8
                *result = self.has(key).await
?1
;
218
7
                Ok(())
219
18
            })
220
            .collect::<FuturesUnordered<_>>()
221
            .try_collect()
222
            .await
223
7
    }
224
225
    async fn update(
226
        self: Pin<&Self>,
227
        digest: StoreKey<'_>,
228
        mut reader: DropCloserReadHalf,
229
        upload_size: UploadSizeInfo,
230
2
    ) -> Result<(), Error> {
231
        let object_path = self.make_object_path(&digest);
232
233
        reader.set_max_recent_data_size(
234
            u64::try_from(self.max_retry_buffer_size)
235
                .err_tip(|| "Could not convert max_retry_buffer_size to u64")?,
236
        );
237
238
        // For small files with exact size, we'll use simple upload
239
        if let UploadSizeInfo::ExactSize(size) = upload_size {
240
            if size < MIN_MULTIPART_SIZE {
241
                let content = reader.consume(Some(size as usize)).await?;
242
                let client = &self.client;
243
244
                return self
245
                    .retrier
246
1
                    .retry(unfold(content, |content| async {
247
1
                        match client.write_object(&object_path, content.to_vec()).await {
248
1
                            Ok(()) => Some((RetryResult::Ok(()), content)),
249
0
                            Err(e) => Some((RetryResult::Retry(e), content)),
250
                        }
251
2
                    }))
252
                    .await;
253
            }
254
        }
255
256
        // For larger files, we'll use resumable upload
257
        // Stream and upload data in chunks
258
        let mut offset = 0u64;
259
        let mut total_size = if let UploadSizeInfo::ExactSize(size) = upload_size {
260
            Some(size)
261
        } else {
262
            None
263
        };
264
        let mut upload_id: Option<String> = None;
265
        let client = &self.client;
266
267
        loop {
268
            let chunk = reader.consume(Some(self.max_chunk_size)).await?;
269
            if chunk.is_empty() {
270
                break;
271
            }
272
            // If a full chunk wasn't read, then this is the full length.
273
            if chunk.len() < self.max_chunk_size {
274
                total_size = Some(offset + chunk.len() as u64);
275
            }
276
277
            let upload_id_ref = if let Some(upload_id_ref) = &upload_id {
278
                upload_id_ref
279
            } else {
280
                // Initiate the upload session on the first non-empty chunk.
281
                upload_id = Some(
282
                    self.retrier
283
1
                        .retry(unfold((), |()| async {
284
1
                            match client.start_resumable_write(&object_path).await {
285
1
                                Ok(id) => Some((RetryResult::Ok(id), ())),
286
0
                                Err(e) => Some((
287
0
                                    RetryResult::Retry(make_err!(
288
0
                                        Code::Aborted,
289
0
                                        "Failed to start resumable upload: {:?}",
290
0
                                        e
291
0
                                    )),
292
0
                                    (),
293
0
                                )),
294
                            }
295
2
                        }))
296
                        .await?,
297
                );
298
                upload_id.as_deref().unwrap()
299
            };
300
301
            let current_offset = offset;
302
            offset += chunk.len() as u64;
303
304
            // Uploading the chunk with a retry
305
            let object_path_ref = &object_path;
306
            self.retrier
307
6
                .retry(unfold(chunk, |chunk| async move {
308
6
                    match client
309
6
                        .upload_chunk(
310
6
                            upload_id_ref,
311
6
                            object_path_ref,
312
6
                            chunk.clone(),
313
6
                            current_offset,
314
6
                            offset,
315
6
                            total_size,
316
6
                        )
317
6
                        .await
318
                    {
319
6
                        Ok(()) => Some((RetryResult::Ok(()), chunk)),
320
0
                        Err(e) => Some((RetryResult::Retry(e), chunk)),
321
                    }
322
12
                }))
323
                .await?;
324
        }
325
326
        // Handle the case that the stream was of unknown length and
327
        // happened to be an exact multiple of chunk size.
328
        if let Some(upload_id_ref) = &upload_id {
329
            if total_size.is_none() {
330
                let object_path_ref = &object_path;
331
                self.retrier
332
0
                    .retry(unfold((), |()| async move {
333
0
                        match client
334
0
                            .upload_chunk(
335
0
                                upload_id_ref,
336
0
                                object_path_ref,
337
0
                                Bytes::new(),
338
0
                                offset,
339
0
                                offset,
340
0
                                Some(offset),
341
0
                            )
342
0
                            .await
343
                        {
344
0
                            Ok(()) => Some((RetryResult::Ok(()), ())),
345
0
                            Err(e) => Some((RetryResult::Retry(e), ())),
346
                        }
347
0
                    }))
348
                    .await?;
349
            }
350
        } else {
351
            // Handle streamed empty file.
352
            return self
353
                .retrier
354
0
                .retry(unfold((), |()| async {
355
0
                    match client.write_object(&object_path, Vec::new()).await {
356
0
                        Ok(()) => Some((RetryResult::Ok(()), ())),
357
0
                        Err(e) => Some((RetryResult::Retry(e), ())),
358
                    }
359
0
                }))
360
                .await;
361
        }
362
363
        // Verifying if the upload was successful
364
        self.retrier
365
1
            .retry(unfold((), |()| async {
366
1
                match client.object_exists(&object_path).await {
367
1
                    Ok(true) => Some((RetryResult::Ok(()), ())),
368
0
                    Ok(false) => Some((
369
0
                        RetryResult::Retry(make_err!(
370
0
                            Code::Internal,
371
0
                            "Object not found after upload completion"
372
0
                        )),
373
0
                        (),
374
0
                    )),
375
0
                    Err(e) => Some((RetryResult::Retry(e), ())),
376
                }
377
2
            }))
378
            .await?;
379
380
        Ok(())
381
2
    }
382
383
    async fn get_part(
384
        self: Pin<&Self>,
385
        key: StoreKey<'_>,
386
        writer: &mut DropCloserWriteHalf,
387
        offset: u64,
388
        length: Option<u64>,
389
3
    ) -> Result<(), Error> {
390
        if is_zero_digest(key.borrow()) {
391
            writer.send_eof()?;
392
            return Ok(());
393
        }
394
395
        let object_path = self.make_object_path(&key);
396
1
        let end_offset = length.map(|len| offset + len);
397
        let client = &self.client;
398
399
        let object_path_ref = &object_path;
400
        self.retrier
401
            .retry(unfold(
402
                (offset, writer),
403
2
                |(mut offset, writer)| async move {
404
2
                    let mut stream = match client
405
2
                        .read_object_content(object_path_ref, offset, end_offset)
406
2
                        .await
407
                    {
408
2
                        Ok(stream) => stream,
409
0
                        Err(e) if e.code == Code::NotFound => {
  Branch (409:35): [True: 0, False: 0]
  Branch (409:35): [True: 0, False: 0]
  Branch (409:35): [Folded - Ignored]
410
0
                            return Some((RetryResult::Err(e), (offset, writer)));
411
                        }
412
0
                        Err(e) => return Some((RetryResult::Retry(e), (offset, writer))),
413
                    };
414
415
4
                    while let Some(
next_chunk2
) = stream.next().await {
  Branch (415:31): [True: 0, False: 0]
  Branch (415:31): [True: 2, False: 2]
  Branch (415:31): [Folded - Ignored]
416
2
                        match next_chunk {
417
2
                            Ok(bytes) => {
418
2
                                offset += bytes.len() as u64;
419
2
                                if let Err(
err0
) = writer.send(bytes).await {
  Branch (419:40): [True: 0, False: 0]
  Branch (419:40): [True: 0, False: 2]
  Branch (419:40): [Folded - Ignored]
420
0
                                    return Some((RetryResult::Err(err), (offset, writer)));
421
2
                                }
422
                            }
423
0
                            Err(err) => return Some((RetryResult::Retry(err), (offset, writer))),
424
                        }
425
                    }
426
427
2
                    if let Err(
err0
) = writer.send_eof() {
  Branch (427:28): [True: 0, False: 0]
  Branch (427:28): [True: 0, False: 2]
  Branch (427:28): [Folded - Ignored]
428
0
                        return Some((RetryResult::Err(err), (offset, writer)));
429
2
                    }
430
431
2
                    Some((RetryResult::Ok(()), (offset, writer)))
432
4
                },
433
            ))
434
            .await
435
3
    }
436
437
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &'_ dyn StoreDriver {
438
0
        self
439
0
    }
440
441
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
442
0
        self
443
0
    }
444
445
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
446
0
        self
447
0
    }
448
449
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
450
0
        registry.register_indicator(self);
451
0
    }
452
453
0
    fn register_remove_callback(
454
0
        self: Arc<Self>,
455
0
        _callback: Arc<dyn RemoveItemCallback>,
456
0
    ) -> Result<(), Error> {
457
        // As we're backed by GCS, this store doesn't actually drop stuff
458
        // so we can actually just ignore this
459
0
        Ok(())
460
0
    }
461
}
462
463
#[async_trait]
464
impl<I, Client, NowFn> HealthStatusIndicator for GcsStore<Client, NowFn>
465
where
466
    I: InstantWrapper,
467
    Client: GcsOperations + 'static,
468
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
469
{
470
0
    fn get_name(&self) -> &'static str {
471
0
        "GcsStore"
472
0
    }
473
474
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
475
        StoreDriver::check_health(Pin::new(self), namespace).await
476
0
    }
477
}