Coverage Report

Created: 2025-07-10 19:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/gcs_client/mocks.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::Debug;
16
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
17
use std::collections::HashMap;
18
use std::time::{SystemTime, UNIX_EPOCH};
19
20
use async_trait::async_trait;
21
use bytes::Bytes;
22
use futures::Stream;
23
use nativelink_error::{Code, Error, make_err};
24
use nativelink_util::buf_channel::DropCloserReadHalf;
25
use tokio::sync::RwLock;
26
27
use crate::gcs_client::client::GcsOperations;
28
use crate::gcs_client::types::{DEFAULT_CONTENT_TYPE, GcsObject, ObjectPath, Timestamp};
29
30
/// A mock implementation of `GcsOperations` for testing
31
#[derive(Debug)]
32
pub struct MockGcsOperations {
33
    // Storage of mock objects with their content
34
    objects: RwLock<HashMap<String, MockObject>>,
35
    // Flag to simulate failures
36
    should_fail: AtomicBool,
37
    // Flag to simulate specific failure modes
38
    failure_mode: RwLock<FailureMode>,
39
    // Counter for operation calls
40
    call_counts: CallCounts,
41
    // For capturing requests to verify correct parameter passing
42
    requests: RwLock<Vec<MockRequest>>,
43
}
44
45
#[derive(Debug, Clone)]
46
struct MockObject {
47
    metadata: GcsObject,
48
    content: Vec<u8>,
49
}
50
51
#[derive(Debug, Default)]
52
pub struct CallCounts {
53
    pub metadata_calls: AtomicUsize,
54
    pub read_calls: AtomicUsize,
55
    pub write_calls: AtomicUsize,
56
    pub start_resumable_calls: AtomicUsize,
57
    pub upload_chunk_calls: AtomicUsize,
58
    pub upload_from_reader_calls: AtomicUsize,
59
    pub object_exists_calls: AtomicUsize,
60
}
61
62
impl Clone for CallCounts {
63
11
    fn clone(&self) -> Self {
64
11
        Self {
65
11
            metadata_calls: AtomicUsize::new(self.metadata_calls.load(Ordering::Relaxed)),
66
11
            read_calls: AtomicUsize::new(self.read_calls.load(Ordering::Relaxed)),
67
11
            write_calls: AtomicUsize::new(self.write_calls.load(Ordering::Relaxed)),
68
11
            start_resumable_calls: AtomicUsize::new(
69
11
                self.start_resumable_calls.load(Ordering::Relaxed),
70
11
            ),
71
11
            upload_chunk_calls: AtomicUsize::new(self.upload_chunk_calls.load(Ordering::Relaxed)),
72
11
            upload_from_reader_calls: AtomicUsize::new(
73
11
                self.upload_from_reader_calls.load(Ordering::Relaxed),
74
11
            ),
75
11
            object_exists_calls: AtomicUsize::new(self.object_exists_calls.load(Ordering::Relaxed)),
76
11
        }
77
11
    }
78
}
79
80
#[derive(Debug, Clone)]
81
pub enum MockRequest {
82
    ReadMetadata {
83
        object_path: ObjectPath,
84
    },
85
    ReadContent {
86
        object_path: ObjectPath,
87
        start: u64,
88
        end: Option<u64>,
89
    },
90
    Write {
91
        object_path: ObjectPath,
92
        content_len: usize,
93
    },
94
    StartResumable {
95
        object_path: ObjectPath,
96
    },
97
    UploadChunk {
98
        upload_url: String,
99
        object_path: ObjectPath,
100
        data_len: usize,
101
        offset: u64,
102
        end_offset: u64,
103
        total_size: Option<u64>,
104
    },
105
    UploadFromReader {
106
        object_path: ObjectPath,
107
        upload_id: String,
108
        max_size: u64,
109
    },
110
    ObjectExists {
111
        object_path: ObjectPath,
112
    },
113
}
114
115
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
116
pub enum FailureMode {
117
    #[default]
118
    None,
119
    NotFound,
120
    NetworkError,
121
    Unauthorized,
122
    ServerError,
123
}
124
125
impl MockGcsOperations {
126
    /// Create a new empty mock GCS operations
127
25
    pub fn new() -> Self {
128
25
        Self {
129
25
            objects: RwLock::new(HashMap::new()),
130
25
            should_fail: AtomicBool::new(false),
131
25
            failure_mode: RwLock::new(FailureMode::None),
132
25
            call_counts: CallCounts::default(),
133
25
            requests: RwLock::new(Vec::new()),
134
25
        }
135
25
    }
136
137
    /// Set whether operations should fail or not
138
7
    pub fn set_should_fail(&self, should_fail: bool) {
139
7
        self.should_fail.store(should_fail, Ordering::Relaxed);
140
7
    }
141
142
    /// Set the specific failure mode to simulate
143
4
    pub async fn set_failure_mode(&self, mode: FailureMode) {
144
4
        *self.failure_mode.write().await = mode;
145
4
    }
146
147
    /// Add a mock object to the store
148
14
    pub async fn add_object(&self, path: &ObjectPath, content: Vec<u8>) {
149
14
        let object_key = self.get_object_key(path);
150
151
        // Get current timestamp
152
14
        let now = SystemTime::now()
153
14
            .duration_since(UNIX_EPOCH)
154
14
            .unwrap_or_default()
155
14
            .as_secs() as i64;
156
157
14
        let metadata = GcsObject {
158
14
            name: path.path.clone(),
159
14
            bucket: path.bucket.clone(),
160
14
            size: content.len() as i64,
161
14
            content_type: DEFAULT_CONTENT_TYPE.to_string(),
162
14
            update_time: Some(Timestamp {
163
14
                seconds: now,
164
14
                nanos: 0,
165
14
            }),
166
14
        };
167
168
14
        let mock_object = MockObject { metadata, content };
169
14
        self.objects.write().await.insert(object_key, mock_object);
170
14
    }
171
172
    /// Remove a mock object from the store
173
0
    pub async fn remove_object(&self, path: &ObjectPath) -> bool {
174
0
        let object_key = self.get_object_key(path);
175
0
        self.objects.write().await.remove(&object_key).is_some()
176
0
    }
177
178
    /// Get the count of all operation calls
179
11
    pub fn get_call_counts(&self) -> CallCounts {
180
11
        self.call_counts.clone()
181
11
    }
182
183
    /// Reset all operation counters
184
1
    pub async fn reset_counters(&self) {
185
1
        self.call_counts.metadata_calls.store(0, Ordering::Relaxed);
186
1
        self.call_counts.read_calls.store(0, Ordering::Relaxed);
187
1
        self.call_counts.write_calls.store(0, Ordering::Relaxed);
188
1
        self.call_counts
189
1
            .start_resumable_calls
190
1
            .store(0, Ordering::Relaxed);
191
1
        self.call_counts
192
1
            .upload_chunk_calls
193
1
            .store(0, Ordering::Relaxed);
194
1
        self.call_counts
195
1
            .upload_from_reader_calls
196
1
            .store(0, Ordering::Relaxed);
197
1
        self.call_counts
198
1
            .object_exists_calls
199
1
            .store(0, Ordering::Relaxed);
200
1
        self.requests.write().await.clear();
201
1
    }
202
203
    /// Clear all objects from the store
204
1
    
pub async fn clear_objects(&self)0
{
205
1
        self.objects.write().await.clear();
206
1
    }
207
208
    /// Get all recorded requests
209
5
    pub async fn get_requests(&self) -> Vec<MockRequest> {
210
5
        self.requests.read().await.clone()
211
5
    }
212
213
    /// Helper method to create a consistent key for objects
214
50
    fn get_object_key(&self, path: &ObjectPath) -> String {
215
50
        format!("{}/{}", path.bucket, path.path)
216
50
    }
217
218
    /// Helper method to handle failures based on current settings
219
51
    async fn handle_failure(&self) -> Result<(), Error> {
220
51
        if self.should_fail.load(Ordering::Relaxed) {
  Branch (220:12): [True: 8, False: 43]
  Branch (220:12): [Folded - Ignored]
221
8
            let value = *self.failure_mode.read().await;
222
8
            match value {
223
4
                FailureMode::None => Err(make_err!(Code::Internal, "Simulated generic failure")),
224
                FailureMode::NotFound => {
225
1
                    Err(make_err!(Code::NotFound, "Simulated not found error"))
226
                }
227
                FailureMode::NetworkError => {
228
1
                    Err(make_err!(Code::Unavailable, "Simulated network error"))
229
                }
230
1
                FailureMode::Unauthorized => Err(make_err!(
231
1
                    Code::Unauthenticated,
232
1
                    "Simulated authentication failure"
233
1
                )),
234
                FailureMode::ServerError => {
235
1
                    Err(make_err!(Code::Internal, "Simulated server error"))
236
                }
237
            }
238
        } else {
239
43
            Ok(())
240
        }
241
51
    }
242
243
    /// Add a mock object with a specific timestamp
244
2
    pub async fn add_object_with_timestamp(
245
2
        &self,
246
2
        path: &ObjectPath,
247
2
        content: Vec<u8>,
248
2
        timestamp: i64,
249
2
    ) {
250
2
        let object_key = self.get_object_key(path);
251
252
2
        let metadata = GcsObject {
253
2
            name: path.path.clone(),
254
2
            bucket: path.bucket.clone(),
255
2
            size: content.len() as i64,
256
2
            content_type: DEFAULT_CONTENT_TYPE.to_string(),
257
2
            update_time: Some(Timestamp {
258
2
                seconds: timestamp,
259
2
                nanos: 0,
260
2
            }),
261
2
        };
262
263
2
        let mock_object = MockObject { metadata, content };
264
2
        self.objects.write().await.insert(object_key, mock_object);
265
2
    }
266
267
    /// Get the current timestamp
268
4
    fn get_current_timestamp(&self) -> i64 {
269
4
        SystemTime::now()
270
4
            .duration_since(UNIX_EPOCH)
271
4
            .unwrap_or_default()
272
4
            .as_secs() as i64
273
4
    }
274
}
275
276
#[async_trait]
277
impl GcsOperations for MockGcsOperations {
278
    async fn read_object_metadata(
279
        &self,
280
        object_path: &ObjectPath,
281
36
    ) -> Result<Option<GcsObject>, Error> {
282
18
        self.call_counts
283
18
            .metadata_calls
284
18
            .fetch_add(1, Ordering::Relaxed);
285
18
        self.requests.write().await.push(MockRequest::ReadMetadata {
286
18
            object_path: object_path.clone(),
287
18
        });
288
289
18
        self.handle_failure().await
?6
;
290
291
12
        let object_key = self.get_object_key(object_path);
292
12
        let objects = self.objects.read().await;
293
294
12
        Ok(objects.get(&object_key).map(|obj| 
obj.metadata7
.
clone7
()))
295
36
    }
296
297
    async fn read_object_content(
298
        &self,
299
        object_path: &ObjectPath,
300
        start: u64,
301
        end: Option<u64>,
302
18
    ) -> Result<Box<dyn Stream<Item = Result<Bytes, Error>> + Send + Unpin>, Error> {
303
        struct OnceStream {
304
            content: Option<Bytes>,
305
        }
306
        impl Stream for OnceStream {
307
            type Item = Result<Bytes, Error>;
308
309
10
            fn poll_next(
310
10
                mut self: core::pin::Pin<&mut Self>,
311
10
                _cx: &mut core::task::Context<'_>,
312
10
            ) -> core::task::Poll<Option<Self::Item>> {
313
10
                core::task::Poll::Ready(self.content.take().map(Ok))
314
10
            }
315
316
0
            fn size_hint(&self) -> (usize, Option<usize>) {
317
0
                match &self.content {
318
0
                    Some(bytes) => (bytes.len(), Some(bytes.len())),
319
0
                    None => (0, Some(0)),
320
                }
321
0
            }
322
        }
323
324
9
        self.call_counts.read_calls.fetch_add(1, Ordering::Relaxed);
325
9
        self.requests.write().await.push(MockRequest::ReadContent {
326
9
            object_path: object_path.clone(),
327
9
            start,
328
9
            end,
329
9
        });
330
331
9
        self.handle_failure().await
?1
;
332
333
8
        let object_key = self.get_object_key(object_path);
334
8
        let objects = self.objects.read().await;
335
336
8
        if let Some(obj) = objects.get(&object_key) {
  Branch (336:16): [True: 8, False: 0]
  Branch (336:16): [Folded - Ignored]
337
8
            let content = &obj.content;
338
339
8
            let start_idx = start as usize;
340
8
            if start_idx > content.len() {
  Branch (340:16): [True: 0, False: 8]
  Branch (340:16): [Folded - Ignored]
341
0
                return Err(make_err!(
342
0
                    Code::OutOfRange,
343
0
                    "Start index {} exceeds content length {}",
344
0
                    start,
345
0
                    content.len()
346
0
                ));
347
8
            }
348
349
            // Calculate end index with validation
350
8
            let end_idx = if let Some(
e2
) = end {
  Branch (350:34): [True: 2, False: 6]
  Branch (350:34): [Folded - Ignored]
351
2
                if e < start {
  Branch (351:20): [True: 0, False: 2]
  Branch (351:20): [Folded - Ignored]
352
0
                    return Err(make_err!(
353
0
                        Code::InvalidArgument,
354
0
                        "End index {} must be greater than or equal to start index {}",
355
0
                        e,
356
0
                        start
357
0
                    ));
358
2
                }
359
2
                core::cmp::min(e as usize, content.len())
360
            } else {
361
6
                content.len()
362
            };
363
364
8
            Ok(Box::new(OnceStream {
365
8
                content: Some(Bytes::copy_from_slice(&content[start_idx..end_idx])),
366
8
            }))
367
        } else {
368
0
            Err(make_err!(Code::NotFound, "Object not found"))
369
        }
370
18
    }
371
372
12
    async fn write_object(&self, object_path: &ObjectPath, content: Vec<u8>) -> Result<(), Error> {
373
6
        self.call_counts.write_calls.fetch_add(1, Ordering::Relaxed);
374
6
        self.requests.write().await.push(MockRequest::Write {
375
6
            object_path: object_path.clone(),
376
6
            content_len: content.len(),
377
6
        });
378
379
6
        self.handle_failure().await
?1
;
380
5
        self.add_object(object_path, content).await;
381
5
        Ok(())
382
12
    }
383
384
4
    async fn start_resumable_write(&self, object_path: &ObjectPath) -> Result<String, Error> {
385
2
        self.call_counts
386
2
            .start_resumable_calls
387
2
            .fetch_add(1, Ordering::Relaxed);
388
2
        self.requests
389
2
            .write()
390
2
            .await
391
2
            .push(MockRequest::StartResumable {
392
2
                object_path: object_path.clone(),
393
2
            });
394
395
2
        self.handle_failure().await
?0
;
396
2
        let upload_id = format!("mock-upload-{}-{}", object_path.bucket, object_path.path);
397
2
        Ok(upload_id)
398
4
    }
399
400
    async fn upload_chunk(
401
        &self,
402
        upload_url: &str,
403
        object_path: &ObjectPath,
404
        data: Bytes,
405
        offset: u64,
406
        end_offset: u64,
407
        total_size: Option<u64>,
408
16
    ) -> Result<(), Error> {
409
8
        self.call_counts
410
8
            .upload_chunk_calls
411
8
            .fetch_add(1, Ordering::Relaxed);
412
8
        self.requests.write().await.push(MockRequest::UploadChunk {
413
8
            upload_url: upload_url.to_string(),
414
8
            object_path: object_path.clone(),
415
8
            data_len: data.len(),
416
8
            offset,
417
8
            end_offset,
418
8
            total_size,
419
8
        });
420
421
8
        self.handle_failure().await
?0
;
422
423
8
        let object_key = self.get_object_key(object_path);
424
8
        let mut objects = self.objects.write().await;
425
426
        // Get or create the object
427
8
        let mock_object = objects
428
8
            .entry(object_key.clone())
429
8
            .or_insert_with(|| MockObject {
430
2
                metadata: GcsObject {
431
2
                    name: object_path.path.clone(),
432
2
                    bucket: object_path.bucket.clone(),
433
2
                    size: 0,
434
2
                    content_type: DEFAULT_CONTENT_TYPE.to_string(),
435
2
                    update_time: Some(Timestamp {
436
2
                        seconds: self.get_current_timestamp(),
437
2
                        nanos: 0,
438
2
                    }),
439
2
                },
440
2
                content: Vec::new(),
441
2
            });
442
443
        // Handle the chunk data
444
8
        let offset_usize = offset as usize;
445
8
        if mock_object.content.len() < offset_usize + data.len() {
  Branch (445:12): [True: 8, False: 0]
  Branch (445:12): [Folded - Ignored]
446
8
            mock_object.content.resize(offset_usize + data.len(), 0);
447
8
        
}0
448
449
8
        if !data.is_empty() {
  Branch (449:12): [True: 8, False: 0]
  Branch (449:12): [Folded - Ignored]
450
8
            mock_object.content[offset_usize..offset_usize + data.len()].copy_from_slice(&data);
451
8
        
}0
452
453
        // Update metadata if this is the final chunk
454
8
        if total_size.map(|size| 
size7
==
end_offset7
) == Some(true) {
  Branch (454:12): [True: 2, False: 6]
  Branch (454:12): [Folded - Ignored]
455
2
            mock_object.metadata.size = mock_object.content.len() as i64;
456
2
            mock_object.metadata.update_time = Some(Timestamp {
457
2
                seconds: self.get_current_timestamp(),
458
2
                nanos: 0,
459
2
            });
460
6
        }
461
462
8
        Ok(())
463
16
    }
464
465
    async fn upload_from_reader(
466
        &self,
467
        object_path: &ObjectPath,
468
        reader: &mut DropCloserReadHalf,
469
        upload_id: &str,
470
        max_size: u64,
471
4
    ) -> Result<(), Error> {
472
2
        self.call_counts
473
2
            .upload_from_reader_calls
474
2
            .fetch_add(1, Ordering::Relaxed);
475
2
        self.requests
476
2
            .write()
477
2
            .await
478
2
            .push(MockRequest::UploadFromReader {
479
2
                object_path: object_path.clone(),
480
2
                upload_id: upload_id.to_string(),
481
2
                max_size,
482
2
            });
483
484
2
        self.handle_failure().await
?0
;
485
486
        // Read all data from the reader
487
2
        let mut buffer = Vec::new();
488
2
        let max_size = max_size as usize;
489
2
        let mut total_read = 0usize;
490
491
4
        while total_read < max_size {
  Branch (491:15): [True: 3, False: 1]
  Branch (491:15): [Folded - Ignored]
492
3
            let to_read = core::cmp::min(max_size - total_read, 8192); // 8KB chunks
493
3
            let chunk = reader.consume(Some(to_read)).await
?0
;
494
495
3
            if chunk.is_empty() {
  Branch (495:16): [True: 1, False: 2]
  Branch (495:16): [Folded - Ignored]
496
1
                break;
497
2
            }
498
499
2
            buffer.extend_from_slice(&chunk);
500
2
            total_read += chunk.len();
501
        }
502
503
2
        self.write_object(object_path, buffer).await
?0
;
504
505
2
        Ok(())
506
4
    }
507
508
12
    async fn object_exists(&self, object_path: &ObjectPath) -> Result<bool, Error> {
509
6
        self.call_counts
510
6
            .object_exists_calls
511
6
            .fetch_add(1, Ordering::Relaxed);
512
6
        self.requests.write().await.push(MockRequest::ObjectExists {
513
6
            object_path: object_path.clone(),
514
6
        });
515
516
6
        self.handle_failure().await
?0
;
517
518
6
        let object_key = self.get_object_key(object_path);
519
6
        let objects = self.objects.read().await;
520
521
6
        Ok(objects.contains_key(&object_key))
522
12
    }
523
}
524
525
impl Default for MockGcsOperations {
526
0
    fn default() -> Self {
527
0
        Self::new()
528
0
    }
529
}