Coverage Report

Created: 2025-10-24 04:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/gcs_client/mocks.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::Debug;
16
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
17
use std::collections::HashMap;
18
use std::time::{SystemTime, UNIX_EPOCH};
19
20
use bytes::Bytes;
21
use futures::Stream;
22
use nativelink_error::{Code, Error, make_err};
23
use nativelink_util::buf_channel::DropCloserReadHalf;
24
use tokio::sync::RwLock;
25
26
use crate::gcs_client::client::GcsOperations;
27
use crate::gcs_client::types::{DEFAULT_CONTENT_TYPE, GcsObject, ObjectPath, Timestamp};
28
29
/// A mock implementation of `GcsOperations` for testing
30
#[derive(Debug)]
31
pub struct MockGcsOperations {
32
    // Storage of mock objects with their content
33
    objects: RwLock<HashMap<String, MockObject>>,
34
    // Flag to simulate failures
35
    should_fail: AtomicBool,
36
    // Flag to simulate specific failure modes
37
    failure_mode: RwLock<FailureMode>,
38
    // Counter for operation calls
39
    call_counts: CallCounts,
40
    // For capturing requests to verify correct parameter passing
41
    requests: RwLock<Vec<MockRequest>>,
42
}
43
44
#[derive(Debug, Clone)]
45
struct MockObject {
46
    metadata: GcsObject,
47
    content: Vec<u8>,
48
}
49
50
#[derive(Debug, Default)]
51
pub struct CallCounts {
52
    pub metadata_calls: AtomicUsize,
53
    pub read_calls: AtomicUsize,
54
    pub write_calls: AtomicUsize,
55
    pub start_resumable_calls: AtomicUsize,
56
    pub upload_chunk_calls: AtomicUsize,
57
    pub upload_from_reader_calls: AtomicUsize,
58
    pub object_exists_calls: AtomicUsize,
59
}
60
61
impl Clone for CallCounts {
62
11
    fn clone(&self) -> Self {
63
11
        Self {
64
11
            metadata_calls: AtomicUsize::new(self.metadata_calls.load(Ordering::Relaxed)),
65
11
            read_calls: AtomicUsize::new(self.read_calls.load(Ordering::Relaxed)),
66
11
            write_calls: AtomicUsize::new(self.write_calls.load(Ordering::Relaxed)),
67
11
            start_resumable_calls: AtomicUsize::new(
68
11
                self.start_resumable_calls.load(Ordering::Relaxed),
69
11
            ),
70
11
            upload_chunk_calls: AtomicUsize::new(self.upload_chunk_calls.load(Ordering::Relaxed)),
71
11
            upload_from_reader_calls: AtomicUsize::new(
72
11
                self.upload_from_reader_calls.load(Ordering::Relaxed),
73
11
            ),
74
11
            object_exists_calls: AtomicUsize::new(self.object_exists_calls.load(Ordering::Relaxed)),
75
11
        }
76
11
    }
77
}
78
79
#[derive(Debug, Clone)]
80
pub enum MockRequest {
81
    ReadMetadata {
82
        object_path: ObjectPath,
83
    },
84
    ReadContent {
85
        object_path: ObjectPath,
86
        start: u64,
87
        end: Option<u64>,
88
    },
89
    Write {
90
        object_path: ObjectPath,
91
        content_len: usize,
92
    },
93
    StartResumable {
94
        object_path: ObjectPath,
95
    },
96
    UploadChunk {
97
        upload_url: String,
98
        object_path: ObjectPath,
99
        data_len: usize,
100
        offset: u64,
101
        end_offset: u64,
102
        total_size: Option<u64>,
103
    },
104
    UploadFromReader {
105
        object_path: ObjectPath,
106
        upload_id: String,
107
        max_size: u64,
108
    },
109
    ObjectExists {
110
        object_path: ObjectPath,
111
    },
112
}
113
114
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
115
pub enum FailureMode {
116
    #[default]
117
    None,
118
    NotFound,
119
    NetworkError,
120
    Unauthorized,
121
    ServerError,
122
}
123
124
impl MockGcsOperations {
125
    /// Create a new empty mock GCS operations
126
27
    pub fn new() -> Self {
127
27
        Self {
128
27
            objects: RwLock::new(HashMap::new()),
129
27
            should_fail: AtomicBool::new(false),
130
27
            failure_mode: RwLock::new(FailureMode::None),
131
27
            call_counts: CallCounts::default(),
132
27
            requests: RwLock::new(Vec::new()),
133
27
        }
134
27
    }
135
136
    /// Set whether operations should fail or not
137
7
    pub fn set_should_fail(&self, should_fail: bool) {
138
7
        self.should_fail.store(should_fail, Ordering::Relaxed);
139
7
    }
140
141
    /// Set the specific failure mode to simulate
142
4
    
pub async fn set_failure_mode(&self, mode: FailureMode)0
{
143
4
        *self.failure_mode.write().await = mode;
144
4
    }
145
146
    /// Add a mock object to the store
147
14
    
pub async fn add_object(&self, path: &ObjectPath, content: Vec<u8>)0
{
148
14
        let object_key = self.get_object_key(path);
149
150
        // Get current timestamp
151
14
        let now = SystemTime::now()
152
14
            .duration_since(UNIX_EPOCH)
153
14
            .unwrap_or_default()
154
14
            .as_secs() as i64;
155
156
14
        let metadata = GcsObject {
157
14
            name: path.path.clone(),
158
14
            bucket: path.bucket.clone(),
159
14
            size: content.len() as i64,
160
14
            content_type: DEFAULT_CONTENT_TYPE.to_string(),
161
14
            update_time: Some(Timestamp {
162
14
                seconds: now,
163
14
                nanos: 0,
164
14
            }),
165
14
        };
166
167
14
        let mock_object = MockObject { metadata, content };
168
14
        self.objects.write().await.insert(object_key, mock_object);
169
14
    }
170
171
    /// Remove a mock object from the store
172
0
    pub async fn remove_object(&self, path: &ObjectPath) -> bool {
173
0
        let object_key = self.get_object_key(path);
174
0
        self.objects.write().await.remove(&object_key).is_some()
175
0
    }
176
177
    /// Get the count of all operation calls
178
11
    pub fn get_call_counts(&self) -> CallCounts {
179
11
        self.call_counts.clone()
180
11
    }
181
182
    /// Reset all operation counters
183
1
    
pub async fn reset_counters(&self)0
{
184
1
        self.call_counts.metadata_calls.store(0, Ordering::Relaxed);
185
1
        self.call_counts.read_calls.store(0, Ordering::Relaxed);
186
1
        self.call_counts.write_calls.store(0, Ordering::Relaxed);
187
1
        self.call_counts
188
1
            .start_resumable_calls
189
1
            .store(0, Ordering::Relaxed);
190
1
        self.call_counts
191
1
            .upload_chunk_calls
192
1
            .store(0, Ordering::Relaxed);
193
1
        self.call_counts
194
1
            .upload_from_reader_calls
195
1
            .store(0, Ordering::Relaxed);
196
1
        self.call_counts
197
1
            .object_exists_calls
198
1
            .store(0, Ordering::Relaxed);
199
1
        self.requests.write().await.clear();
200
1
    }
201
202
    /// Clear all objects from the store
203
1
    
pub async fn clear_objects(&self)0
{
204
1
        self.objects.write().await.clear();
205
1
    }
206
207
    /// Get all recorded requests
208
6
    pub async fn get_requests(&self) -> Vec<MockRequest> {
209
6
        self.requests.read().await.clone()
210
6
    }
211
212
    /// Helper method to create a consistent key for objects
213
50
    fn get_object_key(&self, path: &ObjectPath) -> String {
214
50
        format!("{}/{}", path.bucket, path.path)
215
50
    }
216
217
    /// Helper method to handle failures based on current settings
218
51
    async fn handle_failure(&self) -> Result<(), Error> {
219
51
        if self.should_fail.load(Ordering::Relaxed) {
  Branch (219:12): [Folded - Ignored]
  Branch (219:12): [True: 7, False: 24]
  Branch (219:12): [True: 1, False: 19]
  Branch (219:12): [Folded - Ignored]
220
8
            let value = *self.failure_mode.read().await;
221
8
            match value {
222
4
                FailureMode::None => Err(make_err!(Code::Internal, "Simulated generic failure")),
223
                FailureMode::NotFound => {
224
1
                    Err(make_err!(Code::NotFound, "Simulated not found error"))
225
                }
226
                FailureMode::NetworkError => {
227
1
                    Err(make_err!(Code::Unavailable, "Simulated network error"))
228
                }
229
1
                FailureMode::Unauthorized => Err(make_err!(
230
1
                    Code::Unauthenticated,
231
1
                    "Simulated authentication failure"
232
1
                )),
233
                FailureMode::ServerError => {
234
1
                    Err(make_err!(Code::Internal, "Simulated server error"))
235
                }
236
            }
237
        } else {
238
43
            Ok(())
239
        }
240
51
    }
241
242
    /// Add a mock object with a specific timestamp
243
0
    pub async fn add_object_with_timestamp(
244
0
        &self,
245
0
        path: &ObjectPath,
246
0
        content: Vec<u8>,
247
0
        timestamp: i64,
248
2
    ) {
249
2
        let object_key = self.get_object_key(path);
250
251
2
        let metadata = GcsObject {
252
2
            name: path.path.clone(),
253
2
            bucket: path.bucket.clone(),
254
2
            size: content.len() as i64,
255
2
            content_type: DEFAULT_CONTENT_TYPE.to_string(),
256
2
            update_time: Some(Timestamp {
257
2
                seconds: timestamp,
258
2
                nanos: 0,
259
2
            }),
260
2
        };
261
262
2
        let mock_object = MockObject { metadata, content };
263
2
        self.objects.write().await.insert(object_key, mock_object);
264
2
    }
265
266
    /// Get the current timestamp
267
4
    fn get_current_timestamp(&self) -> i64 {
268
4
        SystemTime::now()
269
4
            .duration_since(UNIX_EPOCH)
270
4
            .unwrap_or_default()
271
4
            .as_secs() as i64
272
4
    }
273
}
274
275
impl GcsOperations for MockGcsOperations {
276
18
    async fn read_object_metadata(
277
18
        &self,
278
18
        object_path: &ObjectPath,
279
18
    ) -> Result<Option<GcsObject>, Error> {
280
18
        self.call_counts
281
18
            .metadata_calls
282
18
            .fetch_add(1, Ordering::Relaxed);
283
18
        self.requests.write().await.push(MockRequest::ReadMetadata {
284
18
            object_path: object_path.clone(),
285
18
        });
286
287
18
        self.handle_failure().await
?6
;
288
289
12
        let object_key = self.get_object_key(object_path);
290
12
        let objects = self.objects.read().await;
291
292
12
        Ok(objects.get(&object_key).map(|obj| 
obj.metadata7
.
clone7
()))
293
18
    }
294
295
0
    async fn read_object_content(
296
0
        &self,
297
0
        object_path: &ObjectPath,
298
0
        start: u64,
299
0
        end: Option<u64>,
300
9
    ) -> Result<Box<dyn Stream<Item = Result<Bytes, Error>> + Send + Unpin>, Error> {
301
        struct OnceStream {
302
            content: Option<Bytes>,
303
        }
304
        impl Stream for OnceStream {
305
            type Item = Result<Bytes, Error>;
306
307
0
            fn poll_next(
308
0
                mut self: core::pin::Pin<&mut Self>,
309
0
                _cx: &mut core::task::Context<'_>,
310
0
            ) -> core::task::Poll<Option<Self::Item>> {
311
0
                core::task::Poll::Ready(self.content.take().map(Ok))
312
0
            }
313
314
0
            fn size_hint(&self) -> (usize, Option<usize>) {
315
0
                match &self.content {
316
0
                    Some(bytes) => (bytes.len(), Some(bytes.len())),
317
0
                    None => (0, Some(0)),
318
                }
319
0
            }
320
        }
321
322
9
        self.call_counts.read_calls.fetch_add(1, Ordering::Relaxed);
323
9
        self.requests.write().await.push(MockRequest::ReadContent {
324
9
            object_path: object_path.clone(),
325
9
            start,
326
9
            end,
327
9
        });
328
329
9
        self.handle_failure().await
?1
;
330
331
8
        let object_key = self.get_object_key(object_path);
332
8
        let objects = self.objects.read().await;
333
334
8
        if let Some(obj) = objects.get(&object_key) {
  Branch (334:16): [Folded - Ignored]
  Branch (334:16): [True: 6, False: 0]
  Branch (334:16): [True: 2, False: 0]
  Branch (334:16): [Folded - Ignored]
335
8
            let content = &obj.content;
336
337
8
            let start_idx = usize::try_from(start).unwrap_or(usize::MAX);
338
8
            if start_idx > content.len() {
  Branch (338:16): [Folded - Ignored]
  Branch (338:16): [True: 0, False: 6]
  Branch (338:16): [True: 0, False: 2]
  Branch (338:16): [Folded - Ignored]
339
0
                return Err(make_err!(
340
0
                    Code::OutOfRange,
341
0
                    "Start index {} exceeds content length {}",
342
0
                    start,
343
0
                    content.len()
344
0
                ));
345
8
            }
346
347
            // Calculate end index with validation
348
8
            let end_idx = if let Some(
e2
) = end {
  Branch (348:34): [Folded - Ignored]
  Branch (348:34): [True: 1, False: 5]
  Branch (348:34): [True: 1, False: 1]
  Branch (348:34): [Folded - Ignored]
349
2
                if e < start {
  Branch (349:20): [Folded - Ignored]
  Branch (349:20): [True: 0, False: 1]
  Branch (349:20): [True: 0, False: 1]
  Branch (349:20): [Folded - Ignored]
350
0
                    return Err(make_err!(
351
0
                        Code::InvalidArgument,
352
0
                        "End index {} must be greater than or equal to start index {}",
353
0
                        e,
354
0
                        start
355
0
                    ));
356
2
                }
357
2
                core::cmp::min(usize::try_from(e).unwrap_or(usize::MAX), content.len())
358
            } else {
359
6
                content.len()
360
            };
361
362
8
            Ok(Box::new(OnceStream {
363
8
                content: Some(Bytes::copy_from_slice(&content[start_idx..end_idx])),
364
8
            }))
365
        } else {
366
0
            Err(make_err!(Code::NotFound, "Object not found"))
367
        }
368
9
    }
369
370
6
    
async fn write_object(&self, object_path: &ObjectPath, content: Vec<u8>) -> Result<(), Error>0
{
371
6
        self.call_counts.write_calls.fetch_add(1, Ordering::Relaxed);
372
6
        self.requests.write().await.push(MockRequest::Write {
373
6
            object_path: object_path.clone(),
374
6
            content_len: content.len(),
375
6
        });
376
377
6
        self.handle_failure().await
?1
;
378
5
        self.add_object(object_path, content).await;
379
5
        Ok(())
380
6
    }
381
382
2
    async fn start_resumable_write(&self, object_path: &ObjectPath) -> Result<String, Error> {
383
2
        self.call_counts
384
2
            .start_resumable_calls
385
2
            .fetch_add(1, Ordering::Relaxed);
386
2
        self.requests
387
2
            .write()
388
2
            .await
389
2
            .push(MockRequest::StartResumable {
390
2
                object_path: object_path.clone(),
391
2
            });
392
393
2
        self.handle_failure().await
?0
;
394
2
        let upload_id = format!("mock-upload-{}-{}", object_path.bucket, object_path.path);
395
2
        Ok(upload_id)
396
2
    }
397
398
0
    async fn upload_chunk(
399
0
        &self,
400
0
        upload_url: &str,
401
0
        object_path: &ObjectPath,
402
0
        data: Bytes,
403
0
        offset: u64,
404
0
        end_offset: u64,
405
0
        total_size: Option<u64>,
406
8
    ) -> Result<(), Error> {
407
8
        self.call_counts
408
8
            .upload_chunk_calls
409
8
            .fetch_add(1, Ordering::Relaxed);
410
8
        self.requests.write().await.push(MockRequest::UploadChunk {
411
8
            upload_url: upload_url.to_string(),
412
8
            object_path: object_path.clone(),
413
8
            data_len: data.len(),
414
8
            offset,
415
8
            end_offset,
416
8
            total_size,
417
8
        });
418
419
8
        self.handle_failure().await
?0
;
420
421
8
        let object_key = self.get_object_key(object_path);
422
8
        let mut objects = self.objects.write().await;
423
424
        // Get or create the object
425
8
        let mock_object = objects
426
8
            .entry(object_key.clone())
427
8
            .or_insert_with(|| MockObject {
428
2
                metadata: GcsObject {
429
2
                    name: object_path.path.clone(),
430
2
                    bucket: object_path.bucket.clone(),
431
2
                    size: 0,
432
2
                    content_type: DEFAULT_CONTENT_TYPE.to_string(),
433
2
                    update_time: Some(Timestamp {
434
2
                        seconds: self.get_current_timestamp(),
435
2
                        nanos: 0,
436
2
                    }),
437
2
                },
438
2
                content: Vec::new(),
439
2
            });
440
441
        // Handle the chunk data
442
8
        let offset_usize = usize::try_from(offset).unwrap_or(usize::MAX);
443
8
        if mock_object.content.len() < offset_usize + data.len() {
  Branch (443:12): [Folded - Ignored]
  Branch (443:12): [True: 2, False: 0]
  Branch (443:12): [True: 6, False: 0]
  Branch (443:12): [Folded - Ignored]
444
8
            mock_object.content.resize(offset_usize + data.len(), 0);
445
8
        
}0
446
447
8
        if !data.is_empty() {
  Branch (447:12): [Folded - Ignored]
  Branch (447:12): [True: 2, False: 0]
  Branch (447:12): [True: 6, False: 0]
  Branch (447:12): [Folded - Ignored]
448
8
            mock_object.content[offset_usize..offset_usize + data.len()].copy_from_slice(&data);
449
8
        
}0
450
451
        // Update metadata if this is the final chunk
452
8
        if total_size.map(|size| 
size7
==
end_offset7
) == Some(true) {
  Branch (452:12): [Folded - Ignored]
  Branch (452:12): [True: 1, False: 1]
  Branch (452:12): [True: 1, False: 5]
  Branch (452:12): [Folded - Ignored]
453
2
            mock_object.metadata.size = mock_object.content.len() as i64;
454
2
            mock_object.metadata.update_time = Some(Timestamp {
455
2
                seconds: self.get_current_timestamp(),
456
2
                nanos: 0,
457
2
            });
458
6
        }
459
460
8
        Ok(())
461
8
    }
462
463
2
    async fn upload_from_reader(
464
2
        &self,
465
2
        object_path: &ObjectPath,
466
2
        reader: &mut DropCloserReadHalf,
467
2
        upload_id: &str,
468
2
        max_size: u64,
469
2
    ) -> Result<(), Error> {
470
2
        self.call_counts
471
2
            .upload_from_reader_calls
472
2
            .fetch_add(1, Ordering::Relaxed);
473
2
        self.requests
474
2
            .write()
475
2
            .await
476
2
            .push(MockRequest::UploadFromReader {
477
2
                object_path: object_path.clone(),
478
2
                upload_id: upload_id.to_string(),
479
2
                max_size,
480
2
            });
481
482
2
        self.handle_failure().await
?0
;
483
484
        // Read all data from the reader
485
2
        let mut buffer = Vec::new();
486
2
        let max_size = usize::try_from(max_size).unwrap_or(usize::MAX);
487
2
        let mut total_read = 0usize;
488
489
4
        while total_read < max_size {
  Branch (489:15): [Folded - Ignored]
  Branch (489:15): [True: 3, False: 1]
  Branch (489:15): [Folded - Ignored]
490
3
            let to_read = core::cmp::min(max_size - total_read, 8192); // 8KB chunks
491
3
            let chunk = reader.consume(Some(to_read)).await
?0
;
492
493
3
            if chunk.is_empty() {
  Branch (493:16): [Folded - Ignored]
  Branch (493:16): [True: 1, False: 2]
  Branch (493:16): [Folded - Ignored]
494
1
                break;
495
2
            }
496
497
2
            buffer.extend_from_slice(&chunk);
498
2
            total_read += chunk.len();
499
        }
500
501
2
        self.write_object(object_path, buffer).await
?0
;
502
503
2
        Ok(())
504
2
    }
505
506
6
    async fn object_exists(&self, object_path: &ObjectPath) -> Result<bool, Error> {
507
6
        self.call_counts
508
6
            .object_exists_calls
509
6
            .fetch_add(1, Ordering::Relaxed);
510
6
        self.requests.write().await.push(MockRequest::ObjectExists {
511
6
            object_path: object_path.clone(),
512
6
        });
513
514
6
        self.handle_failure().await
?0
;
515
516
6
        let object_key = self.get_object_key(object_path);
517
6
        let objects = self.objects.read().await;
518
519
6
        Ok(objects.contains_key(&object_key))
520
6
    }
521
}
522
523
impl Default for MockGcsOperations {
524
0
    fn default() -> Self {
525
0
        Self::new()
526
0
    }
527
}