Coverage Report

Created: 2025-07-10 19:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/gcs_client/client.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::{Debug, Formatter};
16
use core::future::Future;
17
use core::time::Duration;
18
use std::collections::HashMap;
19
use std::path::PathBuf;
20
use std::sync::Arc;
21
22
use async_trait::async_trait;
23
use bytes::Bytes;
24
use futures::Stream;
25
use google_cloud_auth::credentials::CredentialsFile;
26
use google_cloud_storage::client::{Client, ClientConfig};
27
use google_cloud_storage::http::Error as GcsError;
28
use google_cloud_storage::http::objects::Object;
29
use google_cloud_storage::http::objects::download::Range;
30
use google_cloud_storage::http::objects::get::GetObjectRequest;
31
use google_cloud_storage::http::objects::upload::{Media, UploadObjectRequest, UploadType};
32
use google_cloud_storage::http::resumable_upload_client::{ChunkSize, UploadStatus};
33
use nativelink_config::stores::ExperimentalGcsSpec;
34
use nativelink_error::{Code, Error, make_err};
35
use nativelink_util::buf_channel::DropCloserReadHalf;
36
use rand::Rng;
37
use tokio::fs;
38
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
39
use tokio::time::sleep;
40
41
use crate::gcs_client::types::{
42
    CHUNK_SIZE, DEFAULT_CONCURRENT_UPLOADS, DEFAULT_CONTENT_TYPE, GcsObject,
43
    INITIAL_UPLOAD_RETRY_DELAY_MS, MAX_UPLOAD_RETRIES, MAX_UPLOAD_RETRY_DELAY_MS, ObjectPath,
44
    SIMPLE_UPLOAD_THRESHOLD, Timestamp,
45
};
46
47
/// A trait that defines the required GCS operations.
48
/// This abstraction allows for easier testing by mocking GCS responses.
49
#[async_trait]
50
pub trait GcsOperations: Send + Sync + Debug {
51
    /// Read metadata for a GCS object
52
    async fn read_object_metadata(&self, object: &ObjectPath) -> Result<Option<GcsObject>, Error>;
53
54
    /// Read the content of a GCS object, optionally with a range
55
    async fn read_object_content(
56
        &self,
57
        object_path: &ObjectPath,
58
        start: u64,
59
        end: Option<u64>,
60
    ) -> Result<Box<dyn Stream<Item = Result<Bytes, Error>> + Send + Unpin>, Error>;
61
62
    /// Write object with simple upload (for smaller objects)
63
    async fn write_object(&self, object_path: &ObjectPath, content: Vec<u8>) -> Result<(), Error>;
64
65
    /// Start a resumable write operation and return the upload URL
66
    async fn start_resumable_write(&self, object_path: &ObjectPath) -> Result<String, Error>;
67
68
    /// Upload a chunk of data in a resumable upload session
69
    async fn upload_chunk(
70
        &self,
71
        upload_url: &str,
72
        object_path: &ObjectPath,
73
        data: Bytes,
74
        offset: u64,
75
        end_offset: u64,
76
        total_size: Option<u64>,
77
    ) -> Result<(), Error>;
78
79
    /// Complete high-level operation to upload data from a reader
80
    async fn upload_from_reader(
81
        &self,
82
        object_path: &ObjectPath,
83
        reader: &mut DropCloserReadHalf,
84
        upload_id: &str,
85
        max_size: u64,
86
    ) -> Result<(), Error>;
87
88
    /// Check if an object exists
89
    async fn object_exists(&self, object_path: &ObjectPath) -> Result<bool, Error>;
90
}
91
92
/// Main client for interacting with Google Cloud Storage
93
pub struct GcsClient {
94
    client: Client,
95
    resumable_chunk_size: usize,
96
    semaphore: Arc<Semaphore>,
97
}
98
99
impl GcsClient {
100
    /// Create a new GCS client from the provided spec
101
0
    pub async fn new(spec: &ExperimentalGcsSpec) -> Result<Self, Error> {
102
        // Creating default config without authentication initially
103
0
        let mut client_config = ClientConfig::default().anonymous();
104
0
        let mut auth_success = false;
105
106
        // Trying authentication with credentials file path from environment variable.
107
        // Google Cloud Auth expects the path to the credentials file to be specified
108
        // in the GOOGLE_APPLICATION_CREDENTIALS environment variable.
109
        // Check https://cloud.google.com/docs/authentication/application-default-credentials#GAC
110
        // for more details.
111
0
        if let Ok(creds_path) = std::env::var("GOOGLE_APPLICATION_CREDENTIALS") {
  Branch (111:16): [True: 0, False: 0]
  Branch (111:16): [Folded - Ignored]
112
0
            let path_str = PathBuf::from(creds_path).to_string_lossy().to_string();
113
114
0
            tracing::info!("Attempting to load credentials from: {}", path_str);
115
0
            match CredentialsFile::new_from_file(path_str.clone()).await {
116
0
                Ok(creds_file) => {
117
0
                    match ClientConfig::default().with_credentials(creds_file).await {
118
0
                        Ok(config) => {
119
0
                            tracing::info!("Successfully authenticated with credentials file");
120
0
                            client_config = config;
121
0
                            auth_success = true;
122
                        }
123
0
                        Err(e) => {
124
0
                            tracing::warn!("Failed to configure client with credentials: {}", e);
125
                        }
126
                    }
127
                }
128
0
                Err(e) => {
129
0
                    tracing::warn!("Failed to load credentials from file: {}", e);
130
                }
131
            }
132
0
        }
133
134
        // Trying JSON credentials in environment variable if first method failed
135
0
        if !auth_success {
  Branch (135:12): [True: 0, False: 0]
  Branch (135:12): [Folded - Ignored]
136
0
            if let Ok(creds_json) = std::env::var("GOOGLE_APPLICATION_CREDENTIALS_JSON") {
  Branch (136:20): [True: 0, False: 0]
  Branch (136:20): [Folded - Ignored]
137
0
                tracing::info!("Attempting to use credentials from environment variable");
138
139
                // Creating temporary file
140
0
                let temp_dir = std::env::temp_dir();
141
0
                let temp_path = temp_dir.join(format!("gcs-creds-{}.json", uuid::Uuid::new_v4()));
142
0
                let temp_path_str = temp_path.to_string_lossy().to_string();
143
144
                // Writing JSON to temporary file
145
0
                if let Err(e) = fs::write(&temp_path, creds_json).await {
  Branch (145:24): [True: 0, False: 0]
  Branch (145:24): [Folded - Ignored]
146
0
                    tracing::warn!("Failed to write credentials to temp file: {}", e);
147
                } else {
148
                    // Load credentials from temporary file
149
0
                    match CredentialsFile::new_from_file(temp_path_str).await {
150
0
                        Ok(creds_file) => {
151
0
                            match ClientConfig::default().with_credentials(creds_file).await {
152
0
                                Ok(config) => {
153
0
                                    tracing::info!(
154
0
                                        "Successfully authenticated with JSON credentials"
155
                                    );
156
0
                                    client_config = config;
157
0
                                    auth_success = true;
158
                                }
159
0
                                Err(e) => {
160
0
                                    tracing::warn!(
161
0
                                        "Failed to configure client with JSON credentials: {}",
162
                                        e
163
                                    );
164
                                }
165
                            }
166
                        }
167
0
                        Err(e) => {
168
0
                            tracing::warn!("Failed to load credentials from JSON: {}", e);
169
                        }
170
                    }
171
172
                    // Clean up temporary file
173
0
                    drop(fs::remove_file(temp_path).await);
174
                }
175
0
            }
176
0
        }
177
178
0
        if !auth_success {
  Branch (178:12): [True: 0, False: 0]
  Branch (178:12): [Folded - Ignored]
179
0
            match ClientConfig::default().with_auth().await {
180
0
                Ok(config) => {
181
0
                    tracing::info!(
182
0
                        "Successfully authenticated with application default credentials"
183
                    );
184
0
                    client_config = config;
185
                }
186
0
                Err(e) => {
187
0
                    tracing::warn!("Failed to use application default credentials: {}", e);
188
0
                    tracing::info!("Continuing with unauthenticated access");
189
                }
190
            }
191
0
        }
192
193
        // Creating client with the configured authentication
194
0
        let client = Client::new(client_config);
195
0
        let resumable_chunk_size = spec.resumable_chunk_size.unwrap_or(CHUNK_SIZE);
196
197
        // Get max connections from config
198
0
        let max_connections = spec
199
0
            .common
200
0
            .multipart_max_concurrent_uploads
201
0
            .unwrap_or(DEFAULT_CONCURRENT_UPLOADS);
202
203
0
        Ok(Self {
204
0
            client,
205
0
            resumable_chunk_size,
206
0
            semaphore: Arc::new(Semaphore::new(max_connections)),
207
0
        })
208
0
    }
209
210
    /// Generic method to execute operations with connection limiting
211
0
    async fn with_connection<F, Fut, T>(&self, operation: F) -> Result<T, Error>
212
0
    where
213
0
        F: FnOnce() -> Fut + Send,
214
0
        Fut: Future<Output = Result<T, Error>> + Send,
215
0
    {
216
0
        let permit =
217
0
            self.semaphore.acquire().await.map_err(|e| {
218
0
                make_err!(Code::Internal, "Failed to acquire connection permit: {}", e)
219
0
            })?;
220
221
0
        let result = operation().await;
222
0
        drop(permit);
223
0
        result
224
0
    }
225
226
    /// Convert GCS object to our internal representation
227
0
    fn convert_to_gcs_object(&self, obj: Object) -> GcsObject {
228
0
        let update_time = obj.updated.map(|dt| Timestamp {
229
0
            seconds: dt.unix_timestamp(),
230
            nanos: 0,
231
0
        });
232
233
        GcsObject {
234
0
            name: obj.name,
235
0
            bucket: obj.bucket,
236
0
            size: obj.size,
237
0
            content_type: obj
238
0
                .content_type
239
0
                .unwrap_or_else(|| DEFAULT_CONTENT_TYPE.to_string()),
240
0
            update_time,
241
        }
242
0
    }
243
244
    /// Handle error from GCS operations
245
0
    fn handle_gcs_error(err: &GcsError) -> Error {
246
0
        let code = match &err {
247
0
            GcsError::Response(resp) => match resp.code {
248
0
                404 => Code::NotFound,
249
0
                401 | 403 => Code::PermissionDenied,
250
0
                408 | 429 => Code::ResourceExhausted,
251
0
                500..=599 => Code::Unavailable,
252
0
                _ => Code::Unknown,
253
            },
254
0
            _ => Code::Internal,
255
        };
256
257
0
        make_err!(code, "GCS operation failed: {}", err)
258
0
    }
259
260
    /// Reading data from reader and upload in a single operation
261
0
    async fn read_and_upload_all(
262
0
        &self,
263
0
        object_path: &ObjectPath,
264
0
        reader: &mut DropCloserReadHalf,
265
0
        max_size: u64,
266
0
    ) -> Result<(), Error> {
267
0
        let initial_capacity = core::cmp::min(max_size as usize, 10 * 1024 * 1024);
268
0
        let mut data = Vec::with_capacity(initial_capacity);
269
0
        let max_size = max_size as usize;
270
0
        let mut total_size = 0usize;
271
272
0
        while total_size < max_size {
  Branch (272:15): [True: 0, False: 0]
  Branch (272:15): [Folded - Ignored]
273
0
            let to_read = core::cmp::min(self.resumable_chunk_size, max_size - total_size);
274
0
            let chunk = reader.consume(Some(to_read)).await?;
275
276
0
            if chunk.is_empty() {
  Branch (276:16): [True: 0, False: 0]
  Branch (276:16): [Folded - Ignored]
277
0
                break;
278
0
            }
279
280
0
            data.extend_from_slice(&chunk);
281
0
            total_size += chunk.len();
282
        }
283
284
0
        self.write_object(object_path, data).await
285
0
    }
286
287
    /// Implementing a resumable upload using the resumable upload API
288
0
    async fn try_resumable_upload(
289
0
        &self,
290
0
        object_path: &ObjectPath,
291
0
        reader: &mut DropCloserReadHalf,
292
0
        max_size: u64,
293
0
    ) -> Result<(), Error> {
294
0
        self.with_connection(|| async {
295
0
            let request = UploadObjectRequest {
296
0
                bucket: object_path.bucket.clone(),
297
0
                ..Default::default()
298
0
            };
299
300
0
            let mut metadata = HashMap::<String, String>::new();
301
0
            metadata.insert("name".to_string(), object_path.path.clone());
302
303
            // Use Multipart upload type with metadata
304
0
            let upload_type = UploadType::Multipart(Box::new(Object {
305
0
                name: object_path.path.clone(),
306
0
                content_type: Some(DEFAULT_CONTENT_TYPE.to_string()),
307
0
                metadata: Some(metadata),
308
0
                ..Default::default()
309
0
            }));
310
311
            // Prepare resumable upload
312
0
            let uploader = self
313
0
                .client
314
0
                .prepare_resumable_upload(&request, &upload_type)
315
0
                .await
316
0
                .map_err(|e| Self::handle_gcs_error(&e))?;
317
318
            // Upload data in chunks
319
0
            let mut offset: u64 = 0;
320
0
            let max_size = max_size as usize;
321
0
            let mut total_uploaded = 0usize;
322
323
0
            while total_uploaded < max_size {
  Branch (323:19): [True: 0, False: 0]
  Branch (323:19): [Folded - Ignored]
324
0
                let to_read = core::cmp::min(self.resumable_chunk_size, max_size - total_uploaded);
325
0
                let chunk = reader.consume(Some(to_read)).await?;
326
327
0
                if chunk.is_empty() {
  Branch (327:20): [True: 0, False: 0]
  Branch (327:20): [Folded - Ignored]
328
0
                    break;
329
0
                }
330
331
0
                let chunk_size = chunk.len() as u64;
332
0
                total_uploaded += chunk.len();
333
334
0
                let is_final = total_uploaded >= max_size || chunk.len() < to_read;
  Branch (334:32): [True: 0, False: 0]
  Branch (334:32): [Folded - Ignored]
335
0
                let total_size = if is_final {
  Branch (335:37): [True: 0, False: 0]
  Branch (335:37): [Folded - Ignored]
336
0
                    Some(offset + chunk_size)
337
                } else {
338
0
                    Some(max_size as u64)
339
                };
340
341
0
                let chunk_def = ChunkSize::new(offset, offset + chunk_size - 1, total_size);
342
343
                // Upload chunk
344
0
                let status = uploader
345
0
                    .upload_multiple_chunk(chunk, &chunk_def)
346
0
                    .await
347
0
                    .map_err(|e| Self::handle_gcs_error(&e))?;
348
349
                // Update offset for next chunk
350
0
                offset += chunk_size;
351
352
0
                if let UploadStatus::Ok(_) = status {
  Branch (352:24): [True: 0, False: 0]
  Branch (352:24): [Folded - Ignored]
353
0
                    break;
354
0
                }
355
            }
356
357
            // If nothing was uploaded, finalizing with empty content
358
0
            if offset == 0 {
  Branch (358:16): [True: 0, False: 0]
  Branch (358:16): [Folded - Ignored]
359
0
                let chunk_def = ChunkSize::new(0, 0, Some(0));
360
0
                uploader
361
0
                    .upload_multiple_chunk(Vec::new(), &chunk_def)
362
0
                    .await
363
0
                    .map_err(|e| Self::handle_gcs_error(&e))?;
364
0
            }
365
366
            // Check if the object exists
367
0
            match self.read_object_metadata(object_path).await? {
368
0
                Some(_) => Ok(()),
369
0
                None => Err(make_err!(
370
0
                    Code::Internal,
371
0
                    "Upload completed but object not found"
372
0
                )),
373
            }
374
0
        })
375
0
        .await
376
0
    }
377
}
378
379
impl Debug for GcsClient {
380
0
    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
381
0
        f.debug_struct("GcsClient")
382
0
            .field("resumable_chunk_size", &self.resumable_chunk_size)
383
0
            .field("max_connections", &self.semaphore.available_permits())
384
0
            .finish_non_exhaustive()
385
0
    }
386
}
387
388
#[async_trait]
389
impl GcsOperations for GcsClient {
390
    async fn read_object_metadata(
391
        &self,
392
        object_path: &ObjectPath,
393
0
    ) -> Result<Option<GcsObject>, Error> {
394
0
        self.with_connection(|| async {
395
0
            let request = GetObjectRequest {
396
0
                bucket: object_path.bucket.clone(),
397
0
                object: object_path.path.clone(),
398
0
                ..Default::default()
399
0
            };
400
401
0
            match self.client.get_object(&request).await {
402
0
                Ok(obj) => Ok(Some(self.convert_to_gcs_object(obj))),
403
0
                Err(err) => {
404
0
                    if let GcsError::Response(resp) = &err {
  Branch (404:28): [True: 0, False: 0]
  Branch (404:28): [Folded - Ignored]
405
0
                        if resp.code == 404 {
  Branch (405:28): [True: 0, False: 0]
  Branch (405:28): [Folded - Ignored]
406
0
                            return Ok(None);
407
0
                        }
408
0
                    }
409
0
                    Err(Self::handle_gcs_error(&err))
410
                }
411
            }
412
0
        })
413
0
        .await
414
0
    }
415
416
    async fn read_object_content(
417
        &self,
418
        object_path: &ObjectPath,
419
        start: u64,
420
        end: Option<u64>,
421
0
    ) -> Result<Box<dyn Stream<Item = Result<Bytes, Error>> + Send + Unpin>, Error> {
422
        type StreamItem = Result<Bytes, google_cloud_storage::http::Error>;
423
        struct ReadStream<T: Stream<Item = StreamItem> + Send + Unpin> {
424
            stream: T,
425
            permit: Option<OwnedSemaphorePermit>,
426
        }
427
428
        impl<T: Stream<Item = StreamItem> + Send + Unpin> Stream for ReadStream<T> {
429
            type Item = Result<Bytes, Error>;
430
431
0
            fn poll_next(
432
0
                mut self: core::pin::Pin<&mut Self>,
433
0
                cx: &mut core::task::Context<'_>,
434
0
            ) -> core::task::Poll<Option<Self::Item>> {
435
0
                match std::pin::pin!(&mut self.stream).poll_next(cx) {
436
0
                    core::task::Poll::Ready(Some(Ok(bytes))) => {
437
0
                        core::task::Poll::Ready(Some(Ok(bytes)))
438
                    }
439
0
                    core::task::Poll::Ready(Some(Err(err))) => {
440
0
                        self.permit.take();
441
0
                        core::task::Poll::Ready(Some(Err(GcsClient::handle_gcs_error(&err))))
442
                    }
443
                    core::task::Poll::Ready(None) => {
444
0
                        self.permit.take();
445
0
                        core::task::Poll::Ready(None)
446
                    }
447
0
                    core::task::Poll::Pending => core::task::Poll::Pending,
448
                }
449
0
            }
450
451
0
            fn size_hint(&self) -> (usize, Option<usize>) {
452
0
                self.stream.size_hint()
453
0
            }
454
        }
455
456
0
        let permit =
457
0
            self.semaphore.clone().acquire_owned().await.map_err(|e| {
458
0
                make_err!(Code::Internal, "Failed to acquire connection permit: {}", e)
459
0
            })?;
460
0
        let request = GetObjectRequest {
461
0
            bucket: object_path.bucket.clone(),
462
0
            object: object_path.path.clone(),
463
0
            ..Default::default()
464
0
        };
465
466
0
        let start = (start > 0).then_some(start);
467
0
        let range = Range(start, end);
468
469
        // Download the object
470
0
        let stream = self
471
0
            .client
472
0
            .download_streamed_object(&request, &range)
473
0
            .await
474
0
            .map_err(|e| Self::handle_gcs_error(&e))?;
475
476
0
        Ok(Box::new(ReadStream {
477
0
            stream,
478
0
            permit: Some(permit),
479
0
        }))
480
0
    }
481
482
0
    async fn write_object(&self, object_path: &ObjectPath, content: Vec<u8>) -> Result<(), Error> {
483
0
        self.with_connection(|| async {
484
0
            let request = UploadObjectRequest {
485
0
                bucket: object_path.bucket.clone(),
486
0
                ..Default::default()
487
0
            };
488
489
0
            let media = Media::new(object_path.path.clone());
490
0
            let upload_type = UploadType::Simple(media);
491
492
0
            self.client
493
0
                .upload_object(&request, content, &upload_type)
494
0
                .await
495
0
                .map_err(|e| Self::handle_gcs_error(&e))?;
496
497
0
            Ok(())
498
0
        })
499
0
        .await
500
0
    }
501
502
0
    async fn start_resumable_write(&self, object_path: &ObjectPath) -> Result<String, Error> {
503
0
        self.with_connection(|| async {
504
0
            let request = UploadObjectRequest {
505
0
                bucket: object_path.bucket.clone(),
506
0
                ..Default::default()
507
0
            };
508
509
0
            let upload_type = UploadType::Multipart(Box::new(Object {
510
0
                name: object_path.path.clone(),
511
0
                content_type: Some(DEFAULT_CONTENT_TYPE.to_string()),
512
0
                ..Default::default()
513
0
            }));
514
515
            // Start resumable upload session
516
0
            let uploader = self
517
0
                .client
518
0
                .prepare_resumable_upload(&request, &upload_type)
519
0
                .await
520
0
                .map_err(|e| Self::handle_gcs_error(&e))?;
521
522
0
            Ok(uploader.url().to_string())
523
0
        })
524
0
        .await
525
0
    }
526
527
    async fn upload_chunk(
528
        &self,
529
        upload_url: &str,
530
        _object_path: &ObjectPath,
531
        data: Bytes,
532
        offset: u64,
533
        end_offset: u64,
534
        total_size: Option<u64>,
535
0
    ) -> Result<(), Error> {
536
0
        self.with_connection(|| async {
537
0
            let uploader = self.client.get_resumable_upload(upload_url.to_string());
538
539
0
            let last_byte = if end_offset == 0 { 0 } else { end_offset - 1 };
  Branch (539:32): [True: 0, False: 0]
  Branch (539:32): [Folded - Ignored]
540
0
            let chunk_def = ChunkSize::new(offset, last_byte, total_size);
541
542
            // Upload chunk
543
0
            uploader
544
0
                .upload_multiple_chunk(data, &chunk_def)
545
0
                .await
546
0
                .map_err(|e| Self::handle_gcs_error(&e))?;
547
548
0
            Ok(())
549
0
        })
550
0
        .await
551
0
    }
552
553
    async fn upload_from_reader(
554
        &self,
555
        object_path: &ObjectPath,
556
        reader: &mut DropCloserReadHalf,
557
        _upload_id: &str,
558
        max_size: u64,
559
0
    ) -> Result<(), Error> {
560
0
        let mut retry_count = 0;
561
0
        let mut retry_delay = INITIAL_UPLOAD_RETRY_DELAY_MS;
562
563
        loop {
564
0
            let result = if max_size < SIMPLE_UPLOAD_THRESHOLD {
  Branch (564:29): [True: 0, False: 0]
  Branch (564:29): [Folded - Ignored]
565
0
                self.read_and_upload_all(object_path, reader, max_size)
566
0
                    .await
567
            } else {
568
0
                self.try_resumable_upload(object_path, reader, max_size)
569
0
                    .await
570
            };
571
572
0
            match result {
573
0
                Ok(()) => return Ok(()),
574
0
                Err(e) => {
575
0
                    let is_retriable = matches!(
576
0
                        e.code,
577
                        Code::Unavailable | Code::ResourceExhausted | Code::DeadlineExceeded
578
0
                    ) || (e.code == Code::Internal
  Branch (578:27): [True: 0, False: 0]
  Branch (578:27): [Folded - Ignored]
579
0
                        && e.to_string().contains("connection"));
580
581
0
                    if !is_retriable || retry_count >= MAX_UPLOAD_RETRIES {
  Branch (581:24): [True: 0, False: 0]
  Branch (581:41): [True: 0, False: 0]
  Branch (581:24): [Folded - Ignored]
  Branch (581:41): [Folded - Ignored]
582
0
                        return Err(e);
583
0
                    }
584
585
0
                    if let Err(reset_err) = reader.try_reset_stream() {
  Branch (585:28): [True: 0, False: 0]
  Branch (585:28): [Folded - Ignored]
586
0
                        return Err(e.merge(reset_err));
587
0
                    }
588
589
0
                    sleep(Duration::from_millis(retry_delay)).await;
590
0
                    retry_delay = core::cmp::min(retry_delay * 2, MAX_UPLOAD_RETRY_DELAY_MS);
591
592
0
                    let mut rng = rand::rng();
593
0
                    let jitter_factor = rng.random::<f64>().mul_add(0.4, 0.8);
594
0
                    retry_delay = (retry_delay as f64 * jitter_factor) as u64;
595
596
0
                    retry_count += 1;
597
                }
598
            }
599
        }
600
0
    }
601
602
0
    async fn object_exists(&self, object_path: &ObjectPath) -> Result<bool, Error> {
603
0
        let metadata = self.read_object_metadata(object_path).await?;
604
0
        Ok(metadata.is_some())
605
0
    }
606
}