Coverage Report

Created: 2025-12-16 15:31

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/gcs_client/client.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::{Debug, Formatter};
16
use core::future::Future;
17
use core::time::Duration;
18
use std::collections::HashMap;
19
use std::sync::Arc;
20
21
use bytes::Bytes;
22
use futures::Stream;
23
use gcloud_auth::credentials::CredentialsFile;
24
use gcloud_storage::client::{Client, ClientConfig};
25
use gcloud_storage::http::Error as GcsError;
26
use gcloud_storage::http::objects::Object;
27
use gcloud_storage::http::objects::download::Range;
28
use gcloud_storage::http::objects::get::GetObjectRequest;
29
use gcloud_storage::http::objects::upload::{Media, UploadObjectRequest, UploadType};
30
use gcloud_storage::http::resumable_upload_client::{ChunkSize, UploadStatus};
31
use nativelink_config::stores::ExperimentalGcsSpec;
32
use nativelink_error::{Code, Error, ResultExt, make_err};
33
use nativelink_util::buf_channel::DropCloserReadHalf;
34
use rand::Rng;
35
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
36
use tokio::time::sleep;
37
38
use crate::gcs_client::types::{
39
    CHUNK_SIZE, DEFAULT_CONCURRENT_UPLOADS, DEFAULT_CONTENT_TYPE, GcsObject,
40
    INITIAL_UPLOAD_RETRY_DELAY_MS, MAX_UPLOAD_RETRIES, MAX_UPLOAD_RETRY_DELAY_MS, ObjectPath,
41
    SIMPLE_UPLOAD_THRESHOLD, Timestamp,
42
};
43
44
/// A trait that defines the required GCS operations.
45
/// This abstraction allows for easier testing by mocking GCS responses.
46
pub trait GcsOperations: Send + Sync + Debug {
47
    /// Read metadata for a GCS object
48
    fn read_object_metadata(
49
        &self,
50
        object: &ObjectPath,
51
    ) -> impl Future<Output = Result<Option<GcsObject>, Error>> + Send;
52
53
    /// Read the content of a GCS object, optionally with a range
54
    fn read_object_content(
55
        &self,
56
        object_path: &ObjectPath,
57
        start: u64,
58
        end: Option<u64>,
59
    ) -> impl Future<
60
        Output = Result<Box<dyn Stream<Item = Result<Bytes, Error>> + Send + Unpin>, Error>,
61
    > + Send;
62
63
    /// Write object with simple upload (for smaller objects)
64
    fn write_object(
65
        &self,
66
        object_path: &ObjectPath,
67
        content: Vec<u8>,
68
    ) -> impl Future<Output = Result<(), Error>> + Send;
69
70
    /// Start a resumable write operation and return the upload URL
71
    fn start_resumable_write(
72
        &self,
73
        object_path: &ObjectPath,
74
    ) -> impl Future<Output = Result<String, Error>> + Send;
75
76
    /// Upload a chunk of data in a resumable upload session
77
    fn upload_chunk(
78
        &self,
79
        upload_url: &str,
80
        object_path: &ObjectPath,
81
        data: Bytes,
82
        offset: u64,
83
        end_offset: u64,
84
        total_size: Option<u64>,
85
    ) -> impl Future<Output = Result<(), Error>> + Send;
86
87
    /// Complete high-level operation to upload data from a reader
88
    fn upload_from_reader(
89
        &self,
90
        object_path: &ObjectPath,
91
        reader: &mut DropCloserReadHalf,
92
        upload_id: &str,
93
        max_size: u64,
94
    ) -> impl Future<Output = Result<(), Error>>;
95
96
    /// Check if an object exists
97
    fn object_exists(
98
        &self,
99
        object_path: &ObjectPath,
100
    ) -> impl Future<Output = Result<bool, Error>> + Send;
101
}
102
103
/// Main client for interacting with Google Cloud Storage
104
pub struct GcsClient {
105
    client: Client,
106
    resumable_chunk_size: usize,
107
    semaphore: Arc<Semaphore>,
108
}
109
110
impl GcsClient {
111
0
    fn create_client_config(spec: &ExperimentalGcsSpec) -> Result<ClientConfig, Error> {
112
0
        let mut client_config = ClientConfig::default();
113
0
        let connect_timeout = if spec.connection_timeout_s > 0 {
  Branch (113:34): [True: 0, False: 0]
  Branch (113:34): [Folded - Ignored]
114
0
            Duration::from_secs(spec.connection_timeout_s)
115
        } else {
116
0
            Duration::from_secs(3)
117
        };
118
0
        let read_timeout = if spec.read_timeout_s > 0 {
  Branch (118:31): [True: 0, False: 0]
  Branch (118:31): [Folded - Ignored]
119
0
            Duration::from_secs(spec.read_timeout_s)
120
        } else {
121
0
            Duration::from_secs(3)
122
        };
123
0
        let client = reqwest::ClientBuilder::new()
124
0
            .connect_timeout(connect_timeout)
125
0
            .read_timeout(read_timeout)
126
0
            .build()
127
0
            .map_err(|e| make_err!(Code::Internal, "Unable to create GCS client: {e:?}"))?;
128
0
        let mid_client = reqwest_middleware::ClientBuilder::new(client).build();
129
0
        client_config.http = Some(mid_client);
130
0
        Ok(client_config)
131
0
    }
132
133
    /// Create a new GCS client from the provided spec
134
0
    pub async fn new(spec: &ExperimentalGcsSpec) -> Result<Self, Error> {
135
        // Attempt to get the authentication from a file with the environment
136
        // variable GOOGLE_APPLICATION_CREDENTIALS or directly from the
137
        // environment in variable GOOGLE_APPLICATION_CREDENTIALS_JSON.  If that
138
        // fails, attempt to get authentication from the environment.
139
0
        let maybe_client_config = match CredentialsFile::new().await {
140
0
            Ok(credentials) => {
141
0
                Self::create_client_config(spec)?
142
0
                    .with_credentials(credentials)
143
0
                    .await
144
            }
145
0
            Err(_) => Self::create_client_config(spec)?.with_auth().await,
146
        }
147
0
        .map_err(|e| {
148
0
            make_err!(
149
0
                Code::Internal,
150
                "Failed to create client config with credentials: {e:?}"
151
            )
152
0
        });
153
154
        // If authentication is required then error, otherwise use anonymous.
155
0
        let client_config = if spec.authentication_required {
  Branch (155:32): [True: 0, False: 0]
  Branch (155:32): [Folded - Ignored]
156
0
            maybe_client_config.err_tip(|| "Authentication required and none found.")?
157
        } else {
158
0
            maybe_client_config
159
0
                .or_else(|_| Self::create_client_config(spec).map(ClientConfig::anonymous))?
160
        };
161
162
        // Creating client with the configured authentication
163
0
        let client = Client::new(client_config);
164
0
        let resumable_chunk_size = spec.resumable_chunk_size.unwrap_or(CHUNK_SIZE);
165
166
        // Get max connections from config
167
0
        let max_connections = spec
168
0
            .common
169
0
            .multipart_max_concurrent_uploads
170
0
            .unwrap_or(DEFAULT_CONCURRENT_UPLOADS);
171
172
0
        Ok(Self {
173
0
            client,
174
0
            resumable_chunk_size,
175
0
            semaphore: Arc::new(Semaphore::new(max_connections)),
176
0
        })
177
0
    }
178
179
    /// Generic method to execute operations with connection limiting
180
0
    async fn with_connection<F, Fut, T>(&self, operation: F) -> Result<T, Error>
181
0
    where
182
0
        F: FnOnce() -> Fut + Send,
183
0
        Fut: Future<Output = Result<T, Error>> + Send,
184
0
    {
185
0
        let permit =
186
0
            self.semaphore.acquire().await.map_err(|e| {
187
0
                make_err!(Code::Internal, "Failed to acquire connection permit: {}", e)
188
0
            })?;
189
190
0
        let result = operation().await;
191
0
        drop(permit);
192
0
        result
193
0
    }
194
195
    /// Convert GCS object to our internal representation
196
0
    fn convert_to_gcs_object(&self, obj: Object) -> GcsObject {
197
0
        let update_time = obj.updated.map(|dt| Timestamp {
198
0
            seconds: dt.unix_timestamp(),
199
            nanos: 0,
200
0
        });
201
202
        GcsObject {
203
0
            name: obj.name,
204
0
            bucket: obj.bucket,
205
0
            size: obj.size,
206
0
            content_type: obj
207
0
                .content_type
208
0
                .unwrap_or_else(|| DEFAULT_CONTENT_TYPE.to_string()),
209
0
            update_time,
210
        }
211
0
    }
212
213
    /// Handle error from GCS operations
214
0
    fn handle_gcs_error(err: &GcsError) -> Error {
215
0
        let code = match &err {
216
0
            GcsError::Response(resp) => match resp.code {
217
0
                404 => Code::NotFound,
218
0
                401 | 403 => Code::PermissionDenied,
219
0
                408 | 429 => Code::ResourceExhausted,
220
0
                500..=599 => Code::Unavailable,
221
0
                _ => Code::Unknown,
222
            },
223
0
            _ => Code::Internal,
224
        };
225
226
0
        make_err!(code, "GCS operation failed: {}", err)
227
0
    }
228
229
    /// Reading data from reader and upload in a single operation
230
0
    async fn read_and_upload_all(
231
0
        &self,
232
0
        object_path: &ObjectPath,
233
0
        reader: &mut DropCloserReadHalf,
234
0
        max_size: u64,
235
0
    ) -> Result<(), Error> {
236
0
        let initial_capacity = core::cmp::min(
237
0
            usize::try_from(max_size).unwrap_or(usize::MAX),
238
0
            10 * 1024 * 1024,
239
        );
240
0
        let mut data = Vec::with_capacity(initial_capacity);
241
0
        let max_size = usize::try_from(max_size).unwrap_or(usize::MAX);
242
0
        let mut total_size = 0usize;
243
244
0
        while total_size < max_size {
  Branch (244:15): [Folded - Ignored]
  Branch (244:15): [Folded - Ignored]
245
0
            let to_read = core::cmp::min(self.resumable_chunk_size, max_size - total_size);
246
0
            let chunk = reader.consume(Some(to_read)).await?;
247
248
0
            if chunk.is_empty() {
  Branch (248:16): [Folded - Ignored]
  Branch (248:16): [Folded - Ignored]
249
0
                break;
250
0
            }
251
252
0
            data.extend_from_slice(&chunk);
253
0
            total_size += chunk.len();
254
        }
255
256
0
        self.write_object(object_path, data).await
257
0
    }
258
259
    /// Implementing a resumable upload using the resumable upload API
260
0
    async fn try_resumable_upload(
261
0
        &self,
262
0
        object_path: &ObjectPath,
263
0
        reader: &mut DropCloserReadHalf,
264
0
        max_size: u64,
265
0
    ) -> Result<(), Error> {
266
0
        self.with_connection(|| async {
267
0
            let request = UploadObjectRequest {
268
0
                bucket: object_path.bucket.clone(),
269
0
                ..Default::default()
270
0
            };
271
272
0
            let mut metadata = HashMap::<String, String>::new();
273
0
            metadata.insert("name".to_string(), object_path.path.clone());
274
275
            // Use Multipart upload type with metadata
276
0
            let upload_type = UploadType::Multipart(Box::new(Object {
277
0
                name: object_path.path.clone(),
278
0
                content_type: Some(DEFAULT_CONTENT_TYPE.to_string()),
279
0
                metadata: Some(metadata),
280
0
                ..Default::default()
281
0
            }));
282
283
            // Prepare resumable upload
284
0
            let uploader = self
285
0
                .client
286
0
                .prepare_resumable_upload(&request, &upload_type)
287
0
                .await
288
0
                .map_err(|e| Self::handle_gcs_error(&e))?;
289
290
            // Upload data in chunks
291
0
            let mut offset: u64 = 0;
292
0
            let max_size = usize::try_from(max_size).unwrap_or(usize::MAX);
293
0
            let mut total_uploaded = 0usize;
294
295
0
            while total_uploaded < max_size {
  Branch (295:19): [Folded - Ignored]
  Branch (295:19): [Folded - Ignored]
296
0
                let to_read = core::cmp::min(self.resumable_chunk_size, max_size - total_uploaded);
297
0
                let chunk = reader.consume(Some(to_read)).await?;
298
299
0
                if chunk.is_empty() {
  Branch (299:20): [Folded - Ignored]
  Branch (299:20): [Folded - Ignored]
300
0
                    break;
301
0
                }
302
303
0
                let chunk_size = chunk.len() as u64;
304
0
                total_uploaded += chunk.len();
305
306
0
                let is_final = total_uploaded >= max_size || chunk.len() < to_read;
  Branch (306:32): [Folded - Ignored]
  Branch (306:32): [Folded - Ignored]
307
0
                let total_size = if is_final {
  Branch (307:37): [Folded - Ignored]
  Branch (307:37): [Folded - Ignored]
308
0
                    Some(offset + chunk_size)
309
                } else {
310
0
                    Some(max_size as u64)
311
                };
312
313
0
                let chunk_def = ChunkSize::new(offset, offset + chunk_size - 1, total_size);
314
315
                // Upload chunk
316
0
                let status = uploader
317
0
                    .upload_multiple_chunk(chunk, &chunk_def)
318
0
                    .await
319
0
                    .map_err(|e| Self::handle_gcs_error(&e))?;
320
321
                // Update offset for next chunk
322
0
                offset += chunk_size;
323
324
0
                if let UploadStatus::Ok(_) = status {
  Branch (324:24): [Folded - Ignored]
  Branch (324:24): [Folded - Ignored]
325
0
                    break;
326
0
                }
327
            }
328
329
            // If nothing was uploaded, finalizing with empty content
330
0
            if offset == 0 {
  Branch (330:16): [Folded - Ignored]
  Branch (330:16): [Folded - Ignored]
331
0
                let chunk_def = ChunkSize::new(0, 0, Some(0));
332
0
                uploader
333
0
                    .upload_multiple_chunk(Vec::new(), &chunk_def)
334
0
                    .await
335
0
                    .map_err(|e| Self::handle_gcs_error(&e))?;
336
0
            }
337
338
            // Check if the object exists
339
0
            match self.read_object_metadata(object_path).await? {
340
0
                Some(_) => Ok(()),
341
0
                None => Err(make_err!(
342
0
                    Code::Internal,
343
0
                    "Upload completed but object not found"
344
0
                )),
345
            }
346
0
        })
347
0
        .await
348
0
    }
349
}
350
351
impl Debug for GcsClient {
352
0
    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
353
0
        f.debug_struct("GcsClient")
354
0
            .field("resumable_chunk_size", &self.resumable_chunk_size)
355
0
            .field("max_connections", &self.semaphore.available_permits())
356
0
            .finish_non_exhaustive()
357
0
    }
358
}
359
360
impl GcsOperations for GcsClient {
361
0
    async fn read_object_metadata(
362
0
        &self,
363
0
        object_path: &ObjectPath,
364
0
    ) -> Result<Option<GcsObject>, Error> {
365
0
        self.with_connection(|| async {
366
0
            let request = GetObjectRequest {
367
0
                bucket: object_path.bucket.clone(),
368
0
                object: object_path.path.clone(),
369
0
                ..Default::default()
370
0
            };
371
372
0
            match self.client.get_object(&request).await {
373
0
                Ok(obj) => Ok(Some(self.convert_to_gcs_object(obj))),
374
0
                Err(err) => {
375
0
                    if let GcsError::Response(resp) = &err {
  Branch (375:28): [True: 0, False: 0]
  Branch (375:28): [Folded - Ignored]
376
0
                        if resp.code == 404 {
  Branch (376:28): [True: 0, False: 0]
  Branch (376:28): [Folded - Ignored]
377
0
                            return Ok(None);
378
0
                        }
379
0
                    }
380
0
                    Err(Self::handle_gcs_error(&err))
381
                }
382
            }
383
0
        })
384
0
        .await
385
0
    }
386
387
0
    async fn read_object_content(
388
0
        &self,
389
0
        object_path: &ObjectPath,
390
0
        start: u64,
391
0
        end: Option<u64>,
392
0
    ) -> Result<Box<dyn Stream<Item = Result<Bytes, Error>> + Send + Unpin>, Error> {
393
        type StreamItem = Result<Bytes, gcloud_storage::http::Error>;
394
        struct ReadStream<T: Stream<Item = StreamItem> + Send + Unpin> {
395
            stream: T,
396
            permit: Option<OwnedSemaphorePermit>,
397
        }
398
399
        impl<T: Stream<Item = StreamItem> + Send + Unpin> Stream for ReadStream<T> {
400
            type Item = Result<Bytes, Error>;
401
402
0
            fn poll_next(
403
0
                mut self: core::pin::Pin<&mut Self>,
404
0
                cx: &mut core::task::Context<'_>,
405
0
            ) -> core::task::Poll<Option<Self::Item>> {
406
0
                match std::pin::pin!(&mut self.stream).poll_next(cx) {
407
0
                    core::task::Poll::Ready(Some(Ok(bytes))) => {
408
0
                        core::task::Poll::Ready(Some(Ok(bytes)))
409
                    }
410
0
                    core::task::Poll::Ready(Some(Err(err))) => {
411
0
                        self.permit.take();
412
0
                        core::task::Poll::Ready(Some(Err(GcsClient::handle_gcs_error(&err))))
413
                    }
414
                    core::task::Poll::Ready(None) => {
415
0
                        self.permit.take();
416
0
                        core::task::Poll::Ready(None)
417
                    }
418
0
                    core::task::Poll::Pending => core::task::Poll::Pending,
419
                }
420
0
            }
421
422
0
            fn size_hint(&self) -> (usize, Option<usize>) {
423
0
                self.stream.size_hint()
424
0
            }
425
        }
426
427
0
        let permit =
428
0
            self.semaphore.clone().acquire_owned().await.map_err(|e| {
429
0
                make_err!(Code::Internal, "Failed to acquire connection permit: {}", e)
430
0
            })?;
431
0
        let request = GetObjectRequest {
432
0
            bucket: object_path.bucket.clone(),
433
0
            object: object_path.path.clone(),
434
0
            ..Default::default()
435
0
        };
436
437
0
        let start = (start > 0).then_some(start);
438
0
        let range = Range(start, end);
439
440
        // Download the object
441
0
        let stream = self
442
0
            .client
443
0
            .download_streamed_object(&request, &range)
444
0
            .await
445
0
            .map_err(|e| Self::handle_gcs_error(&e))?;
446
447
0
        Ok(Box::new(ReadStream {
448
0
            stream,
449
0
            permit: Some(permit),
450
0
        }))
451
0
    }
452
453
0
    async fn write_object(&self, object_path: &ObjectPath, content: Vec<u8>) -> Result<(), Error> {
454
0
        self.with_connection(|| async {
455
0
            let request = UploadObjectRequest {
456
0
                bucket: object_path.bucket.clone(),
457
0
                ..Default::default()
458
0
            };
459
460
0
            let media = Media::new(object_path.path.clone());
461
0
            let upload_type = UploadType::Simple(media);
462
463
0
            self.client
464
0
                .upload_object(&request, content, &upload_type)
465
0
                .await
466
0
                .map_err(|e| Self::handle_gcs_error(&e))?;
467
468
0
            Ok(())
469
0
        })
470
0
        .await
471
0
    }
472
473
0
    async fn start_resumable_write(&self, object_path: &ObjectPath) -> Result<String, Error> {
474
0
        self.with_connection(|| async {
475
0
            let request = UploadObjectRequest {
476
0
                bucket: object_path.bucket.clone(),
477
0
                ..Default::default()
478
0
            };
479
480
0
            let upload_type = UploadType::Multipart(Box::new(Object {
481
0
                name: object_path.path.clone(),
482
0
                content_type: Some(DEFAULT_CONTENT_TYPE.to_string()),
483
0
                ..Default::default()
484
0
            }));
485
486
            // Start resumable upload session
487
0
            let uploader = self
488
0
                .client
489
0
                .prepare_resumable_upload(&request, &upload_type)
490
0
                .await
491
0
                .map_err(|e| Self::handle_gcs_error(&e))?;
492
493
0
            Ok(uploader.url().to_string())
494
0
        })
495
0
        .await
496
0
    }
497
498
0
    async fn upload_chunk(
499
0
        &self,
500
0
        upload_url: &str,
501
0
        _object_path: &ObjectPath,
502
0
        data: Bytes,
503
0
        offset: u64,
504
0
        end_offset: u64,
505
0
        total_size: Option<u64>,
506
0
    ) -> Result<(), Error> {
507
0
        self.with_connection(|| async {
508
0
            let uploader = self.client.get_resumable_upload(upload_url.to_string());
509
510
0
            let last_byte = if end_offset == 0 { 0 } else { end_offset - 1 };
  Branch (510:32): [True: 0, False: 0]
  Branch (510:32): [Folded - Ignored]
511
0
            let chunk_def = ChunkSize::new(offset, last_byte, total_size);
512
513
            // Upload chunk
514
0
            uploader
515
0
                .upload_multiple_chunk(data, &chunk_def)
516
0
                .await
517
0
                .map_err(|e| Self::handle_gcs_error(&e))?;
518
519
0
            Ok(())
520
0
        })
521
0
        .await
522
0
    }
523
524
0
    async fn upload_from_reader(
525
0
        &self,
526
0
        object_path: &ObjectPath,
527
0
        reader: &mut DropCloserReadHalf,
528
0
        _upload_id: &str,
529
0
        max_size: u64,
530
0
    ) -> Result<(), Error> {
531
0
        let mut retry_count = 0;
532
0
        let mut retry_delay = INITIAL_UPLOAD_RETRY_DELAY_MS;
533
534
        loop {
535
0
            let result = if max_size < SIMPLE_UPLOAD_THRESHOLD {
  Branch (535:29): [Folded - Ignored]
  Branch (535:29): [Folded - Ignored]
536
0
                self.read_and_upload_all(object_path, reader, max_size)
537
0
                    .await
538
            } else {
539
0
                self.try_resumable_upload(object_path, reader, max_size)
540
0
                    .await
541
            };
542
543
0
            match result {
544
0
                Ok(()) => return Ok(()),
545
0
                Err(e) => {
546
0
                    let is_retriable = matches!(
547
0
                        e.code,
548
                        Code::Unavailable | Code::ResourceExhausted | Code::DeadlineExceeded
549
0
                    ) || (e.code == Code::Internal
  Branch (549:27): [Folded - Ignored]
  Branch (549:27): [Folded - Ignored]
550
0
                        && e.to_string().contains("connection"));
551
552
0
                    if !is_retriable || retry_count >= MAX_UPLOAD_RETRIES {
  Branch (552:24): [Folded - Ignored]
  Branch (552:41): [Folded - Ignored]
  Branch (552:24): [Folded - Ignored]
  Branch (552:41): [Folded - Ignored]
553
0
                        return Err(e);
554
0
                    }
555
556
0
                    if let Err(reset_err) = reader.try_reset_stream() {
  Branch (556:28): [Folded - Ignored]
  Branch (556:28): [Folded - Ignored]
557
0
                        return Err(e.merge(reset_err));
558
0
                    }
559
560
0
                    sleep(Duration::from_millis(retry_delay)).await;
561
0
                    retry_delay = core::cmp::min(retry_delay * 2, MAX_UPLOAD_RETRY_DELAY_MS);
562
563
0
                    let mut rng = rand::rng();
564
0
                    let jitter_factor = rng.random::<f64>().mul_add(0.4, 0.8);
565
0
                    retry_delay = Duration::from_millis(retry_delay)
566
0
                        .mul_f64(jitter_factor)
567
0
                        .as_millis()
568
0
                        .try_into()
569
0
                        .unwrap_or(u64::MAX);
570
571
0
                    retry_count += 1;
572
                }
573
            }
574
        }
575
0
    }
576
577
0
    async fn object_exists(&self, object_path: &ObjectPath) -> Result<bool, Error> {
578
0
        let metadata = self.read_object_metadata(object_path).await?;
579
0
        Ok(metadata.is_some())
580
0
    }
581
}