Coverage Report

Created: 2025-05-08 18:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/gcs_client/mocks.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::Debug;
16
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
17
use std::collections::HashMap;
18
use std::time::{SystemTime, UNIX_EPOCH};
19
20
use async_trait::async_trait;
21
use nativelink_error::{Code, Error, make_err};
22
use nativelink_util::buf_channel::DropCloserReadHalf;
23
use tokio::sync::RwLock;
24
25
use crate::gcs_client::client::GcsOperations;
26
use crate::gcs_client::types::{DEFAULT_CONTENT_TYPE, GcsObject, ObjectPath, Timestamp};
27
28
/// A mock implementation of `GcsOperations` for testing
29
#[derive(Debug)]
30
pub struct MockGcsOperations {
31
    // Storage of mock objects with their content
32
    objects: RwLock<HashMap<String, MockObject>>,
33
    // Flag to simulate failures
34
    should_fail: AtomicBool,
35
    // Flag to simulate specific failure modes
36
    failure_mode: RwLock<FailureMode>,
37
    // Counter for operation calls
38
    call_counts: CallCounts,
39
    // For capturing requests to verify correct parameter passing
40
    requests: RwLock<Vec<MockRequest>>,
41
}
42
43
#[derive(Debug, Clone)]
44
struct MockObject {
45
    metadata: GcsObject,
46
    content: Vec<u8>,
47
}
48
49
#[derive(Debug, Default)]
50
pub struct CallCounts {
51
    pub metadata_calls: AtomicUsize,
52
    pub read_calls: AtomicUsize,
53
    pub write_calls: AtomicUsize,
54
    pub start_resumable_calls: AtomicUsize,
55
    pub upload_chunk_calls: AtomicUsize,
56
    pub upload_from_reader_calls: AtomicUsize,
57
    pub object_exists_calls: AtomicUsize,
58
}
59
60
impl Clone for CallCounts {
61
11
    fn clone(&self) -> Self {
62
11
        Self {
63
11
            metadata_calls: AtomicUsize::new(self.metadata_calls.load(Ordering::Relaxed)),
64
11
            read_calls: AtomicUsize::new(self.read_calls.load(Ordering::Relaxed)),
65
11
            write_calls: AtomicUsize::new(self.write_calls.load(Ordering::Relaxed)),
66
11
            start_resumable_calls: AtomicUsize::new(
67
11
                self.start_resumable_calls.load(Ordering::Relaxed),
68
11
            ),
69
11
            upload_chunk_calls: AtomicUsize::new(self.upload_chunk_calls.load(Ordering::Relaxed)),
70
11
            upload_from_reader_calls: AtomicUsize::new(
71
11
                self.upload_from_reader_calls.load(Ordering::Relaxed),
72
11
            ),
73
11
            object_exists_calls: AtomicUsize::new(self.object_exists_calls.load(Ordering::Relaxed)),
74
11
        }
75
11
    }
76
}
77
78
#[derive(Debug, Clone)]
79
pub enum MockRequest {
80
    ReadMetadata {
81
        object_path: ObjectPath,
82
    },
83
    ReadContent {
84
        object_path: ObjectPath,
85
        start: i64,
86
        end: Option<i64>,
87
    },
88
    Write {
89
        object_path: ObjectPath,
90
        content_len: usize,
91
    },
92
    StartResumable {
93
        object_path: ObjectPath,
94
    },
95
    UploadChunk {
96
        upload_url: String,
97
        object_path: ObjectPath,
98
        data_len: usize,
99
        offset: i64,
100
        end_offset: i64,
101
        is_final: bool,
102
    },
103
    UploadFromReader {
104
        object_path: ObjectPath,
105
        upload_id: String,
106
        max_size: i64,
107
    },
108
    ObjectExists {
109
        object_path: ObjectPath,
110
    },
111
}
112
113
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
114
pub enum FailureMode {
115
    #[default]
116
    None,
117
    NotFound,
118
    NetworkError,
119
    Unauthorized,
120
    ServerError,
121
}
122
123
impl MockGcsOperations {
124
    /// Create a new empty mock GCS operations
125
24
    pub fn new() -> Self {
126
24
        Self {
127
24
            objects: RwLock::new(HashMap::new()),
128
24
            should_fail: AtomicBool::new(false),
129
24
            failure_mode: RwLock::new(FailureMode::None),
130
24
            call_counts: CallCounts::default(),
131
24
            requests: RwLock::new(Vec::new()),
132
24
        }
133
24
    }
134
135
    /// Set whether operations should fail or not
136
6
    pub fn set_should_fail(&self, should_fail: bool) {
137
6
        self.should_fail.store(should_fail, Ordering::Relaxed);
138
6
    }
139
140
    /// Set the specific failure mode to simulate
141
4
    
pub async fn set_failure_mode(&self, mode: FailureMode) 0
{
142
4
        *self.failure_mode.write().await = mode;
143
4
    }
144
145
    /// Add a mock object to the store
146
14
    pub async fn add_object(&self, path: &ObjectPath, content: Vec<u8>) {
147
14
        let object_key = self.get_object_key(path);
148
14
149
14
        // Get current timestamp
150
14
        let now = SystemTime::now()
151
14
            .duration_since(UNIX_EPOCH)
152
14
            .unwrap_or_default()
153
14
            .as_secs() as i64;
154
14
155
14
        let metadata = GcsObject {
156
14
            name: path.path.clone(),
157
14
            bucket: path.bucket.clone(),
158
14
            size: content.len() as i64,
159
14
            content_type: DEFAULT_CONTENT_TYPE.to_string(),
160
14
            update_time: Some(Timestamp {
161
14
                seconds: now,
162
14
                nanos: 0,
163
14
            }),
164
14
        };
165
14
166
14
        let mock_object = MockObject { metadata, content };
167
14
        self.objects.write().await.insert(object_key, mock_object);
168
14
    }
169
170
    /// Remove a mock object from the store
171
0
    pub async fn remove_object(&self, path: &ObjectPath) -> bool {
172
0
        let object_key = self.get_object_key(path);
173
0
        self.objects.write().await.remove(&object_key).is_some()
174
0
    }
175
176
    /// Get the count of all operation calls
177
11
    pub fn get_call_counts(&self) -> CallCounts {
178
11
        self.call_counts.clone()
179
11
    }
180
181
    /// Reset all operation counters
182
1
    
pub async fn reset_counters(&self) 0
{
183
1
        self.call_counts.metadata_calls.store(0, Ordering::Relaxed);
184
1
        self.call_counts.read_calls.store(0, Ordering::Relaxed);
185
1
        self.call_counts.write_calls.store(0, Ordering::Relaxed);
186
1
        self.call_counts
187
1
            .start_resumable_calls
188
1
            .store(0, Ordering::Relaxed);
189
1
        self.call_counts
190
1
            .upload_chunk_calls
191
1
            .store(0, Ordering::Relaxed);
192
1
        self.call_counts
193
1
            .upload_from_reader_calls
194
1
            .store(0, Ordering::Relaxed);
195
1
        self.call_counts
196
1
            .object_exists_calls
197
1
            .store(0, Ordering::Relaxed);
198
1
        self.requests.write().await.clear();
199
1
    }
200
201
    /// Clear all objects from the store
202
1
    pub async fn clear_objects(&self) {
203
1
        self.objects.write().await.clear();
204
1
    }
205
206
    /// Get all recorded requests
207
5
    pub async fn get_requests(&self) -> Vec<MockRequest> {
208
5
        self.requests.read().await.clone()
209
5
    }
210
211
    /// Helper method to create a consistent key for objects
212
50
    fn get_object_key(&self, path: &ObjectPath) -> String {
213
50
        format!("{}/{}", path.bucket, path.path)
214
50
    }
215
216
    /// Helper method to handle failures based on current settings
217
50
    async fn handle_failure(&self) -> Result<(), Error> {
218
50
        if self.should_fail.load(Ordering::Relaxed) {
  Branch (218:12): [True: 7, False: 43]
  Branch (218:12): [Folded - Ignored]
219
7
            let value = *self.failure_mode.read().await;
220
7
            match value {
221
3
                FailureMode::None => Err(make_err!(Code::Internal, "Simulated generic failure")),
222
                FailureMode::NotFound => {
223
1
                    Err(make_err!(Code::NotFound, "Simulated not found error"))
224
                }
225
                FailureMode::NetworkError => {
226
1
                    Err(make_err!(Code::Unavailable, "Simulated network error"))
227
                }
228
1
                FailureMode::Unauthorized => Err(make_err!(
229
1
                    Code::Unauthenticated,
230
1
                    "Simulated authentication failure"
231
1
                )),
232
                FailureMode::ServerError => {
233
1
                    Err(make_err!(Code::Internal, "Simulated server error"))
234
                }
235
            }
236
        } else {
237
43
            Ok(())
238
        }
239
50
    }
240
241
    /// Add a mock object with a specific timestamp
242
0
    pub async fn add_object_with_timestamp(
243
0
        &self,
244
0
        path: &ObjectPath,
245
0
        content: Vec<u8>,
246
0
        timestamp: i64,
247
2
    ) {
248
2
        let object_key = self.get_object_key(path);
249
2
250
2
        let metadata = GcsObject {
251
2
            name: path.path.clone(),
252
2
            bucket: path.bucket.clone(),
253
2
            size: content.len() as i64,
254
2
            content_type: DEFAULT_CONTENT_TYPE.to_string(),
255
2
            update_time: Some(Timestamp {
256
2
                seconds: timestamp,
257
2
                nanos: 0,
258
2
            }),
259
2
        };
260
2
261
2
        let mock_object = MockObject { metadata, content };
262
2
        self.objects.write().await.insert(object_key, mock_object);
263
2
    }
264
265
    /// Get the current timestamp
266
4
    fn get_current_timestamp(&self) -> i64 {
267
4
        SystemTime::now()
268
4
            .duration_since(UNIX_EPOCH)
269
4
            .unwrap_or_default()
270
4
            .as_secs() as i64
271
4
    }
272
}
273
274
#[async_trait]
275
impl GcsOperations for MockGcsOperations {
276
    async fn read_object_metadata(
277
        &self,
278
        object_path: &ObjectPath,
279
34
    ) -> Result<Option<GcsObject>, Error> {
280
17
        self.call_counts
281
17
            .metadata_calls
282
17
            .fetch_add(1, Ordering::Relaxed);
283
17
        self.requests.write().await.push(MockRequest::ReadMetadata {
284
17
            object_path: object_path.clone(),
285
17
        });
286
17
287
17
        self.handle_failure().await
?5
;
288
289
12
        let object_key = self.get_object_key(object_path);
290
12
        let objects = self.objects.read().await;
291
292
12
        Ok(objects.get(&object_key).map(|obj| 
obj.metadata.clone()7
))
293
34
    }
294
295
    async fn read_object_content(
296
        &self,
297
        object_path: &ObjectPath,
298
        start: i64,
299
        end: Option<i64>,
300
18
    ) -> Result<Vec<u8>, Error> {
301
9
        self.call_counts.read_calls.fetch_add(1, Ordering::Relaxed);
302
9
        self.requests.write().await.push(MockRequest::ReadContent {
303
9
            object_path: object_path.clone(),
304
9
            start,
305
9
            end,
306
9
        });
307
9
308
9
        self.handle_failure().await
?1
;
309
310
8
        let object_key = self.get_object_key(object_path);
311
8
        let objects = self.objects.read().await;
312
313
8
        if let Some(obj) = objects.get(&object_key) {
  Branch (313:16): [True: 8, False: 0]
  Branch (313:16): [Folded - Ignored]
314
8
            let content = &obj.content;
315
8
            if start < 0 {
  Branch (315:16): [True: 0, False: 8]
  Branch (315:16): [Folded - Ignored]
316
0
                return Err(make_err!(
317
0
                    Code::InvalidArgument,
318
0
                    "Start index {} must be non-negative",
319
0
                    start
320
0
                ));
321
8
            }
322
8
323
8
            let start_idx = start as usize;
324
8
            if start_idx > content.len() {
  Branch (324:16): [True: 0, False: 8]
  Branch (324:16): [Folded - Ignored]
325
0
                return Err(make_err!(
326
0
                    Code::OutOfRange,
327
0
                    "Start index {} exceeds content length {}",
328
0
                    start,
329
0
                    content.len()
330
0
                ));
331
8
            }
332
333
            // Calculate end index with validation
334
8
            let end_idx = if let Some(
e2
) = end {
  Branch (334:34): [True: 2, False: 6]
  Branch (334:34): [Folded - Ignored]
335
2
                if e < start {
  Branch (335:20): [True: 0, False: 2]
  Branch (335:20): [Folded - Ignored]
336
0
                    return Err(make_err!(
337
0
                        Code::InvalidArgument,
338
0
                        "End index {} must be greater than or equal to start index {}",
339
0
                        e,
340
0
                        start
341
0
                    ));
342
2
                }
343
2
                core::cmp::min(e as usize, content.len())
344
            } else {
345
6
                content.len()
346
            };
347
348
8
            Ok(content[start_idx..end_idx].to_vec())
349
        } else {
350
0
            Err(make_err!(Code::NotFound, "Object not found"))
351
        }
352
18
    }
353
354
12
    async fn write_object(&self, object_path: &ObjectPath, content: Vec<u8>) -> Result<(), Error> {
355
6
        self.call_counts.write_calls.fetch_add(1, Ordering::Relaxed);
356
6
        self.requests.write().await.push(MockRequest::Write {
357
6
            object_path: object_path.clone(),
358
6
            content_len: content.len(),
359
6
        });
360
6
361
6
        self.handle_failure().await
?1
;
362
5
        self.add_object(object_path, content).await;
363
5
        Ok(())
364
12
    }
365
366
4
    async fn start_resumable_write(&self, object_path: &ObjectPath) -> Result<String, Error> {
367
2
        self.call_counts
368
2
            .start_resumable_calls
369
2
            .fetch_add(1, Ordering::Relaxed);
370
2
        self.requests
371
2
            .write()
372
2
            .await
373
2
            .push(MockRequest::StartResumable {
374
2
                object_path: object_path.clone(),
375
2
            });
376
2
377
2
        self.handle_failure().await
?0
;
378
2
        let upload_id = format!("mock-upload-{}-{}", object_path.bucket, object_path.path);
379
2
        Ok(upload_id)
380
4
    }
381
382
    async fn upload_chunk(
383
        &self,
384
        upload_url: &str,
385
        object_path: &ObjectPath,
386
        data: Vec<u8>,
387
        offset: i64,
388
        end_offset: i64,
389
        is_final: bool,
390
16
    ) -> Result<(), Error> {
391
8
        self.call_counts
392
8
            .upload_chunk_calls
393
8
            .fetch_add(1, Ordering::Relaxed);
394
8
        self.requests.write().await.push(MockRequest::UploadChunk {
395
8
            upload_url: upload_url.to_string(),
396
8
            object_path: object_path.clone(),
397
8
            data_len: data.len(),
398
8
            offset,
399
8
            end_offset,
400
8
            is_final,
401
8
        });
402
8
403
8
        self.handle_failure().await
?0
;
404
405
8
        let object_key = self.get_object_key(object_path);
406
8
        let mut objects = self.objects.write().await;
407
408
        // Get or create the object
409
8
        let mock_object = objects
410
8
            .entry(object_key.clone())
411
8
            .or_insert_with(|| MockObject {
412
2
                metadata: GcsObject {
413
2
                    name: object_path.path.clone(),
414
2
                    bucket: object_path.bucket.clone(),
415
2
                    size: 0,
416
2
                    content_type: DEFAULT_CONTENT_TYPE.to_string(),
417
2
                    update_time: Some(Timestamp {
418
2
                        seconds: self.get_current_timestamp(),
419
2
                        nanos: 0,
420
2
                    }),
421
2
                },
422
2
                content: Vec::new(),
423
2
            });
424
425
        // Handle the chunk data
426
8
        let offset_usize = offset as usize;
427
8
        if mock_object.content.len() < offset_usize + data.len() {
  Branch (427:12): [True: 8, False: 0]
  Branch (427:12): [Folded - Ignored]
428
8
            mock_object.content.resize(offset_usize + data.len(), 0);
429
8
        
}0
430
431
8
        if !data.is_empty() {
  Branch (431:12): [True: 8, False: 0]
  Branch (431:12): [Folded - Ignored]
432
8
            mock_object.content[offset_usize..offset_usize + data.len()].copy_from_slice(&data);
433
8
        
}0
434
435
        // Update metadata if this is the final chunk
436
8
        if is_final {
  Branch (436:12): [True: 2, False: 6]
  Branch (436:12): [Folded - Ignored]
437
2
            mock_object.metadata.size = mock_object.content.len() as i64;
438
2
            mock_object.metadata.update_time = Some(Timestamp {
439
2
                seconds: self.get_current_timestamp(),
440
2
                nanos: 0,
441
2
            });
442
6
        }
443
444
8
        Ok(())
445
16
    }
446
447
    async fn upload_from_reader(
448
        &self,
449
        object_path: &ObjectPath,
450
        reader: &mut DropCloserReadHalf,
451
        upload_id: &str,
452
        max_size: i64,
453
4
    ) -> Result<(), Error> {
454
2
        self.call_counts
455
2
            .upload_from_reader_calls
456
2
            .fetch_add(1, Ordering::Relaxed);
457
2
        self.requests
458
2
            .write()
459
2
            .await
460
2
            .push(MockRequest::UploadFromReader {
461
2
                object_path: object_path.clone(),
462
2
                upload_id: upload_id.to_string(),
463
2
                max_size,
464
2
            });
465
2
466
2
        self.handle_failure().await
?0
;
467
468
        // Read all data from the reader
469
2
        let mut buffer = Vec::new();
470
2
        let mut total_read = 0i64;
471
472
4
        while total_read < max_size {
  Branch (472:15): [True: 3, False: 1]
  Branch (472:15): [Folded - Ignored]
473
3
            let to_read = core::cmp::min((max_size - total_read) as usize, 8192); // 8KB chunks
474
3
            let chunk = reader.consume(Some(to_read)).await
?0
;
475
476
3
            if chunk.is_empty() {
  Branch (476:16): [True: 1, False: 2]
  Branch (476:16): [Folded - Ignored]
477
1
                break;
478
2
            }
479
2
480
2
            buffer.extend_from_slice(&chunk);
481
2
            total_read += chunk.len() as i64;
482
        }
483
484
2
        self.write_object(object_path, buffer).await
?0
;
485
486
2
        Ok(())
487
4
    }
488
489
12
    async fn object_exists(&self, object_path: &ObjectPath) -> Result<bool, Error> {
490
6
        self.call_counts
491
6
            .object_exists_calls
492
6
            .fetch_add(1, Ordering::Relaxed);
493
6
        self.requests.write().await.push(MockRequest::ObjectExists {
494
6
            object_path: object_path.clone(),
495
6
        });
496
6
497
6
        self.handle_failure().await
?0
;
498
499
6
        let object_key = self.get_object_key(object_path);
500
6
        let objects = self.objects.read().await;
501
502
6
        Ok(objects.contains_key(&object_key))
503
12
    }
504
}
505
506
impl Default for MockGcsOperations {
507
0
    fn default() -> Self {
508
0
        Self::new()
509
0
    }
510
}