Coverage Report

Created: 2025-05-08 18:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/gcs_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::Debug;
16
use core::pin::Pin;
17
use std::borrow::Cow;
18
use std::sync::Arc;
19
20
use async_trait::async_trait;
21
use bytes::Bytes;
22
use futures::TryStreamExt;
23
use futures::stream::{FuturesUnordered, unfold};
24
use nativelink_config::stores::ExperimentalGcsSpec;
25
use nativelink_error::{Code, Error, ResultExt, make_err};
26
use nativelink_metric::MetricsComponent;
27
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
28
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
29
use nativelink_util::instant_wrapper::InstantWrapper;
30
use nativelink_util::retry::{Retrier, RetryResult};
31
use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo};
32
use rand::Rng;
33
use tokio::time::sleep;
34
35
use crate::cas_utils::is_zero_digest;
36
use crate::gcs_client::client::{GcsClient, GcsOperations};
37
use crate::gcs_client::types::{
38
    CHUNK_SIZE, DEFAULT_CONCURRENT_UPLOADS, DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST,
39
    MIN_MULTIPART_SIZE, ObjectPath,
40
};
41
42
#[derive(MetricsComponent, Debug)]
43
pub struct GcsStore<NowFn> {
44
    client: Arc<dyn GcsOperations>,
45
    now_fn: NowFn,
46
    #[metric(help = "The bucket name for the GCS store")]
47
    bucket: String,
48
    #[metric(help = "The key prefix for the GCS store")]
49
    key_prefix: String,
50
    retrier: Retrier,
51
    #[metric(help = "The number of seconds to consider an object expired")]
52
    consider_expired_after_s: i64,
53
    #[metric(help = "The number of bytes to buffer for retrying requests")]
54
    max_retry_buffer_size: usize,
55
    #[metric(help = "The size of chunks for resumable uploads")]
56
    max_chunk_size: usize,
57
    #[metric(help = "The number of concurrent uploads allowed")]
58
    max_concurrent_uploads: usize,
59
}
60
61
impl<I, NowFn> GcsStore<NowFn>
62
where
63
    I: InstantWrapper,
64
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
65
{
66
0
    pub async fn new(spec: &ExperimentalGcsSpec, now_fn: NowFn) -> Result<Arc<Self>, Error> {
67
0
        let client = GcsClient::new(spec).await?;
68
0
        let client: Arc<dyn GcsOperations> = Arc::new(client);
69
0
70
0
        Self::new_with_ops(spec, client, now_fn)
71
0
    }
72
73
    // Primarily used for injecting a mock or real operations implementation
74
11
    pub fn new_with_ops(
75
11
        spec: &ExperimentalGcsSpec,
76
11
        client: Arc<dyn GcsOperations>,
77
11
        now_fn: NowFn,
78
11
    ) -> Result<Arc<Self>, Error> {
79
11
        let max_connections = spec
80
11
            .common
81
11
            .multipart_max_concurrent_uploads
82
11
            .unwrap_or(DEFAULT_CONCURRENT_UPLOADS);
83
11
84
11
        let jitter_amt = spec.common.retry.jitter;
85
11
        let jitter_fn = Arc::new(move |delay: tokio::time::Duration| {
86
0
            if jitter_amt == 0.0 {
  Branch (86:16): [True: 0, False: 0]
  Branch (86:16): [True: 0, False: 0]
  Branch (86:16): [Folded - Ignored]
87
0
                return delay;
88
0
            }
89
0
            delay.mul_f32(jitter_amt.mul_add(rand::rng().random::<f32>() - 0.5, 1.))
90
0
        });
91
92
11
        Ok(Arc::new(Self {
93
11
            client,
94
11
            now_fn,
95
11
            bucket: spec.bucket.clone(),
96
11
            key_prefix: spec
97
11
                .common
98
11
                .key_prefix
99
11
                .as_ref()
100
11
                .unwrap_or(&String::new())
101
11
                .clone(),
102
11
            retrier: Retrier::new(
103
11
                Arc::new(|duration| 
Box::pin(sleep(duration))0
),
104
11
                jitter_fn,
105
11
                spec.common.retry.clone(),
106
11
            ),
107
11
            consider_expired_after_s: i64::from(spec.common.consider_expired_after_s),
108
11
            max_retry_buffer_size: spec
109
11
                .common
110
11
                .max_retry_buffer_per_request
111
11
                .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST),
112
11
            max_chunk_size: core::cmp::min(
113
11
                spec.resumable_chunk_size.unwrap_or(CHUNK_SIZE),
114
11
                CHUNK_SIZE,
115
11
            ),
116
11
            max_concurrent_uploads: max_connections,
117
11
        }))
118
11
    }
119
120
7
    async fn has(self: Pin<&Self>, key: &StoreKey<'_>) -> Result<Option<u64>, Error> {
121
7
        let object_path = self.make_object_path(key);
122
7
        let client = &self.client;
123
7
        let consider_expired_after_s = self.consider_expired_after_s;
124
7
        let now_fn = &self.now_fn;
125
7
126
7
        self.retrier
127
7
            .retry(unfold(object_path, move |object_path| async move {
128
7
                match client.read_object_metadata(&object_path).await {
129
4
                    Ok(Some(metadata)) => {
130
4
                        if consider_expired_after_s != 0 {
  Branch (130:28): [True: 0, False: 0]
  Branch (130:28): [True: 2, False: 2]
  Branch (130:28): [Folded - Ignored]
131
2
                            if let Some(update_time) = &metadata.update_time {
  Branch (131:36): [True: 0, False: 0]
  Branch (131:36): [True: 2, False: 0]
  Branch (131:36): [Folded - Ignored]
132
2
                                let now_s = now_fn().unix_timestamp() as i64;
133
2
                                if update_time.seconds + consider_expired_after_s <= now_s {
  Branch (133:36): [True: 0, False: 0]
  Branch (133:36): [True: 1, False: 1]
  Branch (133:36): [Folded - Ignored]
134
1
                                    return Some((RetryResult::Ok(None), object_path));
135
1
                                }
136
0
                            }
137
2
                        }
138
139
3
                        if metadata.size >= 0 {
  Branch (139:28): [True: 0, False: 0]
  Branch (139:28): [True: 3, False: 0]
  Branch (139:28): [Folded - Ignored]
140
3
                            Some((RetryResult::Ok(Some(metadata.size as u64)), object_path))
141
                        } else {
142
0
                            Some((
143
0
                                RetryResult::Err(make_err!(
144
0
                                    Code::InvalidArgument,
145
0
                                    "Invalid metadata size in GCS: {}",
146
0
                                    metadata.size
147
0
                                )),
148
0
                                object_path,
149
0
                            ))
150
                        }
151
                    }
152
3
                    Ok(None) => Some((RetryResult::Ok(None), object_path)),
153
0
                    Err(e) if e.code == Code::NotFound => {
  Branch (153:31): [True: 0, False: 0]
  Branch (153:31): [True: 0, False: 0]
  Branch (153:31): [Folded - Ignored]
154
0
                        Some((RetryResult::Ok(None), object_path))
155
                    }
156
0
                    Err(e) => Some((RetryResult::Retry(e), object_path)),
157
                }
158
14
            }))
159
7
            .await
160
7
    }
161
162
11
    fn make_object_path(&self, key: &StoreKey) -> ObjectPath {
163
11
        ObjectPath::new(
164
11
            self.bucket.clone(),
165
11
            &format!("{}{}", self.key_prefix, key.as_str()),
166
11
        )
167
11
    }
168
}
169
170
#[async_trait]
171
impl<I, NowFn> StoreDriver for GcsStore<NowFn>
172
where
173
    I: InstantWrapper,
174
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
175
{
176
    async fn has_with_results(
177
        self: Pin<&Self>,
178
        keys: &[StoreKey<'_>],
179
        results: &mut [Option<u64>],
180
12
    ) -> Result<(), Error> {
181
6
        keys.iter()
182
6
            .zip(results.iter_mut())
183
8
            .map(|(key, result)| async move {
184
8
                if is_zero_digest(key.borrow()) {
  Branch (184:20): [True: 0, False: 0]
  Branch (184:20): [True: 1, False: 7]
  Branch (184:20): [Folded - Ignored]
185
1
                    *result = Some(0);
186
1
                    return Ok(());
187
7
                }
188
7
                *result = self.has(key).await
?0
;
189
7
                Ok(())
190
16
            })
191
6
            .collect::<FuturesUnordered<_>>()
192
6
            .try_collect()
193
6
            .await
194
12
    }
195
196
    async fn update(
197
        self: Pin<&Self>,
198
        digest: StoreKey<'_>,
199
        mut reader: DropCloserReadHalf,
200
        upload_size: UploadSizeInfo,
201
4
    ) -> Result<(), Error> {
202
2
        let object_path = self.make_object_path(&digest);
203
2
        let max_size = match upload_size {
204
2
            UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(
sz0
) => sz,
205
2
        };
206
2
207
2
        reader.set_max_recent_data_size(
208
2
            u64::try_from(self.max_retry_buffer_size)
209
2
                .err_tip(|| 
"Could not convert max_retry_buffer_size to u64"0
)
?0
,
210
        );
211
212
        // For small files with exact size, we'll use simple upload
213
2
        if max_size < MIN_MULTIPART_SIZE && 
matches!0
(
upload_size1
, UploadSizeInfo::ExactSize(_)) {
  Branch (213:12): [True: 0, False: 0]
  Branch (213:12): [True: 1, False: 1]
  Branch (213:12): [Folded - Ignored]
214
1
            let content = reader.consume(Some(max_size as usize)).await
?0
;
215
1
            let client = &self.client;
216
1
            let object_path_cloned = object_path.clone();
217
1
218
1
            return self
219
1
                .retrier
220
1
                .retry(unfold(content, move |content| {
221
1
                    let object_path_cloned = object_path_cloned.clone();
222
1
                    async move {
223
1
                        match client
224
1
                            .write_object(&object_path_cloned, content.to_vec())
225
1
                            .await
226
                        {
227
1
                            Ok(()) => Some((RetryResult::Ok(()), content)),
228
0
                            Err(e) => Some((RetryResult::Retry(e), content)),
229
                        }
230
1
                    }
231
1
                }))
232
1
                .await;
233
1
        }
234
1
235
1
        // For larger files, we'll use resumable upload
236
1
        // First, we'll initiate the upload session
237
1
        let client = &self.client;
238
1
        let object_path_for_start = object_path.clone();
239
1
        let upload_id = self
240
1
            .retrier
241
1
            .retry(unfold((), move |()| {
242
1
                let object_path = object_path_for_start.clone();
243
1
                async move {
244
1
                    match client.start_resumable_write(&object_path).await {
245
1
                        Ok(id) => Some((RetryResult::Ok(id), ())),
246
0
                        Err(e) => Some((
247
0
                            RetryResult::Retry(make_err!(
248
0
                                Code::Aborted,
249
0
                                "Failed to start resumable upload: {:?}",
250
0
                                e
251
0
                            )),
252
0
                            (),
253
0
                        )),
254
                    }
255
1
                }
256
1
            }))
257
1
            .await
?0
;
258
259
        // Stream and upload data in chunks
260
1
        let chunk_size = core::cmp::min(self.max_chunk_size, max_size as usize);
261
1
        let mut offset = 0u64;
262
1
        let mut total_uploaded = 0u64;
263
1
264
1
        let upload_id = upload_id.clone();
265
1
        let object_path_for_chunks = object_path.clone();
266
267
        loop {
268
6
            let to_read = core::cmp::min(chunk_size, (max_size - total_uploaded) as usize);
269
6
            if to_read == 0 {
  Branch (269:16): [True: 0, False: 0]
  Branch (269:16): [True: 0, False: 6]
  Branch (269:16): [Folded - Ignored]
270
0
                break;
271
6
            }
272
273
6
            let chunk = reader.consume(Some(to_read)).await
?0
;
274
6
            if chunk.is_empty() {
  Branch (274:16): [True: 0, False: 0]
  Branch (274:16): [True: 0, False: 6]
  Branch (274:16): [Folded - Ignored]
275
0
                break;
276
6
            }
277
6
278
6
            let chunk_size = chunk.len() as u64;
279
6
            total_uploaded += chunk_size;
280
6
            let is_final = total_uploaded >= max_size || 
chunk.len() < to_read5
;
  Branch (280:28): [True: 0, False: 0]
  Branch (280:28): [True: 1, False: 5]
  Branch (280:28): [Folded - Ignored]
281
6
            let current_offset = offset;
282
6
            let object_path = object_path_for_chunks.clone();
283
6
            let upload_id_clone = upload_id.clone();
284
6
285
6
            // Uploading the chunk with a retry
286
6
            self.retrier
287
6
                .retry(unfold(chunk, move |chunk| {
288
6
                    let object_path = object_path.clone();
289
6
                    let upload_id_clone = upload_id_clone.clone();
290
6
                    async move {
291
6
                        match client
292
6
                            .upload_chunk(
293
6
                                &upload_id_clone,
294
6
                                &object_path,
295
6
                                chunk.to_vec(),
296
6
                                current_offset as i64,
297
6
                                (current_offset + chunk.len() as u64) as i64,
298
6
                                is_final,
299
6
                            )
300
6
                            .await
301
                        {
302
6
                            Ok(()) => Some((RetryResult::Ok(()), chunk)),
303
0
                            Err(e) => Some((RetryResult::Retry(e), chunk)),
304
                        }
305
6
                    }
306
6
                }))
307
6
                .await
?0
;
308
309
6
            offset += chunk_size;
310
6
311
6
            if is_final {
  Branch (311:16): [True: 0, False: 0]
  Branch (311:16): [True: 1, False: 5]
  Branch (311:16): [Folded - Ignored]
312
1
                break;
313
5
            }
314
        }
315
316
        // Handle the edge case: empty file (nothing uploaded)
317
1
        if offset == 0 {
  Branch (317:12): [True: 0, False: 0]
  Branch (317:12): [True: 0, False: 1]
  Branch (317:12): [Folded - Ignored]
318
0
            let object_path = object_path.clone();
319
0
            let upload_id_clone = upload_id.clone();
320
0
321
0
            self.retrier
322
0
                .retry(unfold((), move |()| {
323
0
                    let object_path = object_path.clone();
324
0
                    let upload_id_clone = upload_id_clone.clone();
325
0
                    async move {
326
0
                        match client
327
0
                            .upload_chunk(&upload_id_clone, &object_path, Vec::new(), 0, 0, true)
328
0
                            .await
329
                        {
330
0
                            Ok(()) => Some((RetryResult::Ok(()), ())),
331
0
                            Err(e) => Some((RetryResult::Retry(e), ())),
332
                        }
333
0
                    }
334
0
                }))
335
0
                .await?;
336
1
        }
337
338
        // Verifying if the upload was successful
339
1
        let object_path = object_path.clone();
340
1
341
1
        self.retrier
342
1
            .retry(unfold((), move |()| {
343
1
                let object_path = object_path.clone();
344
1
                async move {
345
1
                    match client.object_exists(&object_path).await {
346
1
                        Ok(true) => Some((RetryResult::Ok(()), ())),
347
0
                        Ok(false) => Some((
348
0
                            RetryResult::Retry(make_err!(
349
0
                                Code::Internal,
350
0
                                "Object not found after upload completion"
351
0
                            )),
352
0
                            (),
353
0
                        )),
354
0
                        Err(e) => Some((RetryResult::Retry(e), ())),
355
                    }
356
1
                }
357
1
            }))
358
1
            .await
?0
;
359
360
1
        Ok(())
361
4
    }
362
363
    async fn get_part(
364
        self: Pin<&Self>,
365
        key: StoreKey<'_>,
366
        writer: &mut DropCloserWriteHalf,
367
        offset: u64,
368
        length: Option<u64>,
369
6
    ) -> Result<(), Error> {
370
3
        if is_zero_digest(key.borrow()) {
  Branch (370:12): [True: 0, False: 0]
  Branch (370:12): [True: 1, False: 2]
  Branch (370:12): [Folded - Ignored]
371
1
            writer.send_eof()
?0
;
372
1
            return Ok(());
373
2
        }
374
2
375
2
        let object_path = self.make_object_path(&key);
376
2
        let end_offset = length.map(|len| 
offset + len1
);
377
2
        let client = &self.client;
378
379
2
        let result = self
380
2
            .retrier
381
2
            .retry(unfold(
382
2
                (offset, end_offset, object_path.clone()),
383
2
                move |(start_offset, end_offset, object_path)| async move {
384
2
                    match client
385
2
                        .read_object_content(
386
2
                            &object_path,
387
2
                            start_offset as i64,
388
2
                            end_offset.map(|e| 
e as i641
),
389
                        )
390
2
                        .await
391
                    {
392
2
                        Ok(data) => Some((
393
2
                            RetryResult::Ok(data),
394
2
                            (start_offset, end_offset, object_path),
395
2
                        )),
396
0
                        Err(e) if e.code == Code::NotFound => {
  Branch (396:35): [True: 0, False: 0]
  Branch (396:35): [True: 0, False: 0]
  Branch (396:35): [Folded - Ignored]
397
0
                            Some((RetryResult::Err(e), (start_offset, end_offset, object_path)))
398
                        }
399
0
                        Err(e) => Some((
400
0
                            RetryResult::Retry(e),
401
0
                            (start_offset, end_offset, object_path),
402
0
                        )),
403
                    }
404
4
                },
405
            ))
406
2
            .await;
407
408
2
        match result {
409
2
            Ok(data) => {
410
2
                if !data.is_empty() {
  Branch (410:20): [True: 0, False: 0]
  Branch (410:20): [True: 2, False: 0]
  Branch (410:20): [Folded - Ignored]
411
2
                    writer.send(Bytes::from(data)).await
?0
;
412
0
                }
413
2
                writer.send_eof()
?0
;
414
2
                Ok(())
415
            }
416
0
            Err(e) => {
417
0
                drop(writer.send_eof());
418
0
                Err(e)
419
            }
420
        }
421
6
    }
422
423
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &'_ dyn StoreDriver {
424
0
        self
425
0
    }
426
427
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
428
0
        self
429
0
    }
430
431
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
432
0
        self
433
0
    }
434
435
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
436
0
        registry.register_indicator(self);
437
0
    }
438
}
439
440
#[async_trait]
441
impl<I, NowFn> HealthStatusIndicator for GcsStore<NowFn>
442
where
443
    I: InstantWrapper,
444
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
445
{
446
0
    fn get_name(&self) -> &'static str {
447
0
        "GcsStore"
448
0
    }
449
450
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
451
0
        StoreDriver::check_health(Pin::new(self), namespace).await
452
0
    }
453
}