Coverage Report

Created: 2025-09-18 15:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/ontap_s3_existence_cache_store.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17
use core::time::Duration;
18
use std::borrow::Cow;
19
use std::collections::HashSet;
20
use std::sync::Arc;
21
22
use async_trait::async_trait;
23
use aws_config::BehaviorVersion;
24
use aws_config::default_provider::credentials::DefaultCredentialsChain;
25
use aws_config::provider_config::ProviderConfig;
26
use aws_sdk_s3::Client;
27
use aws_sdk_s3::config::Region;
28
use nativelink_config::stores::{ExperimentalOntapS3Spec, OntapS3ExistenceCacheSpec};
29
use nativelink_error::{Code, Error, ResultExt, make_err};
30
use nativelink_metric::MetricsComponent;
31
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
32
use nativelink_util::common::DigestInfo;
33
use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator};
34
use nativelink_util::instant_wrapper::InstantWrapper;
35
use nativelink_util::metrics_utils::CounterWithTime;
36
use nativelink_util::spawn;
37
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo};
38
use serde::{Deserialize, Serialize};
39
use tokio::fs;
40
use tokio::sync::RwLock;
41
use tokio::time::{interval, sleep};
42
use tracing::{Level, event};
43
44
use crate::cas_utils::is_zero_digest;
45
use crate::common_s3_utils::TlsClient;
46
use crate::ontap_s3_store::OntapS3Store;
47
48
#[derive(Serialize, Deserialize)]
49
struct CacheFile {
50
    digests: HashSet<DigestInfo>,
51
    timestamp: u64,
52
}
53
54
#[derive(Debug, MetricsComponent)]
55
pub struct OntapS3ExistenceCache<I, NowFn>
56
where
57
    I: InstantWrapper,
58
    NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static,
59
{
60
    inner_store: Store,
61
    s3_client: Arc<Client>,
62
    now_fn: NowFn,
63
    bucket: String,
64
    key_prefix: Option<String>,
65
    index_path: String,
66
    digests: Arc<RwLock<HashSet<DigestInfo>>>,
67
    sync_interval_seconds: u32,
68
    last_sync: Arc<AtomicU64>,
69
    sync_in_progress: Arc<AtomicBool>,
70
71
    #[metric(help = "Number of cache hits")]
72
    cache_hits: CounterWithTime,
73
    #[metric(help = "Number of cache misses")]
74
    cache_misses: CounterWithTime,
75
}
76
77
impl<I, NowFn> OntapS3ExistenceCache<I, NowFn>
78
where
79
    I: InstantWrapper,
80
    NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static,
81
{
82
2
    pub fn new_for_testing(
83
2
        inner_store: Store,
84
2
        s3_client: Arc<Client>,
85
2
        cache_file_path: String,
86
2
        digests: HashSet<DigestInfo>,
87
2
        sync_interval_seconds: u32,
88
2
        now_fn: NowFn,
89
2
    ) -> Arc<Self> {
90
2
        Arc::new(Self {
91
2
            inner_store,
92
2
            s3_client,
93
2
            bucket: "test-bucket".to_string(),
94
2
            key_prefix: None,
95
2
            index_path: cache_file_path,
96
2
            digests: Arc::new(RwLock::new(digests)),
97
2
            sync_interval_seconds,
98
2
            last_sync: Arc::new(AtomicU64::new(
99
2
                std::time::SystemTime::now()
100
2
                    .duration_since(std::time::UNIX_EPOCH)
101
2
                    .unwrap_or_default()
102
2
                    .as_secs(),
103
2
            )),
104
2
            sync_in_progress: Arc::new(AtomicBool::new(false)),
105
2
            now_fn,
106
2
            cache_hits: CounterWithTime::default(),
107
2
            cache_misses: CounterWithTime::default(),
108
2
        })
109
2
    }
110
111
2
    async fn list_objects(&self, test_client: Option<Arc<Client>>) -> Result<Vec<String>, Error> {
112
2
        let client = test_client.unwrap_or_else(|| 
self.s3_client0
.
clone0
());
113
114
2
        let mut objects = Vec::new();
115
2
        let mut continuation_token = None;
116
117
        loop {
118
2
            let mut request = client
119
2
                .list_objects_v2()
120
2
                .bucket(&self.bucket)
121
2
                .prefix(self.key_prefix.as_deref().unwrap_or(""))
122
2
                .max_keys(1000); // Process in smaller batches for better responsiveness
123
124
2
            if let Some(
token0
) = &continuation_token {
  Branch (124:20): [True: 0, False: 0]
  Branch (124:20): [Folded - Ignored]
  Branch (124:20): [True: 0, False: 2]
125
0
                request = request.continuation_token(token);
126
2
            }
127
128
            // Add a timeout to avoid hanging indefinitely
129
2
            let result = match tokio::time::timeout(Duration::from_secs(30), request.send()).await {
130
2
                Ok(Ok(response)) => response,
131
0
                Ok(Err(e)) => {
132
                    // Log the error but continue
133
0
                    event!(Level::ERROR, ?e, "Error listing objects in ONTAP S3");
134
0
                    return Err(make_err!(Code::Internal, "Failed to list objects: {}", e));
135
                }
136
                Err(_) => {
137
                    // Timeout occurred
138
0
                    event!(Level::ERROR, "Timeout listing objects in ONTAP S3");
139
0
                    return Err(make_err!(Code::Internal, "Timeout listing objects"));
140
                }
141
            };
142
143
2
            if let Some(contents) = result.contents {
  Branch (143:20): [True: 0, False: 0]
  Branch (143:20): [Folded - Ignored]
  Branch (143:20): [True: 2, False: 0]
144
7
                for 
object5
in contents {
145
5
                    if let Some(key) = object.key {
  Branch (145:28): [True: 0, False: 0]
  Branch (145:28): [Folded - Ignored]
  Branch (145:28): [True: 5, False: 0]
146
5
                        objects.push(key);
147
5
                    
}0
148
                }
149
0
            }
150
151
2
            match result.next_continuation_token {
152
0
                Some(token) => {
153
0
                    continuation_token = Some(token);
154
                    // Yield to other tasks periodically to avoid monopolizing the runtime
155
0
                    tokio::task::yield_now().await;
156
                }
157
2
                None => break,
158
            }
159
        }
160
2
        event!(Level::INFO, "Listed {} objects from S3", objects.len());
161
2
        event!(Level::INFO, "Used prefix: {:?}", self.key_prefix);
162
2
        Ok(objects)
163
2
    }
164
165
5
    async fn parse_key_to_digest(&self, key: &str) -> Option<DigestInfo> {
166
        // Remove the prefix if it exists
167
5
        let relative_key = key
168
5
            .strip_prefix(self.key_prefix.as_deref().unwrap_or(""))
169
5
            .unwrap_or(key)
170
5
            .trim_start_matches('/');
171
172
        // Skip the directory entry itself
173
5
        if relative_key.is_empty() {
  Branch (173:12): [True: 0, False: 0]
  Branch (173:12): [Folded - Ignored]
  Branch (173:12): [True: 0, False: 5]
174
0
            return None;
175
5
        }
176
177
5
        let parts: Vec<&str> = relative_key.split('-').collect();
178
179
5
        if parts.len() != 2 {
  Branch (179:12): [True: 0, False: 0]
  Branch (179:12): [Folded - Ignored]
  Branch (179:12): [True: 0, False: 5]
180
0
            return None;
181
5
        }
182
183
5
        match (hex::decode(parts[0]), parts[1].parse::<u64>()) {
184
5
            (Ok(hash), Ok(size)) => {
185
5
                if hash.len() == 32 {
  Branch (185:20): [True: 0, False: 0]
  Branch (185:20): [Folded - Ignored]
  Branch (185:20): [True: 5, False: 0]
186
                    // Clone the hash before converting
187
5
                    let hash_array: [u8; 32] = hash.try_into().unwrap();
188
5
                    let digest = DigestInfo::new(hash_array, size);
189
190
5
                    Some(digest)
191
                } else {
192
0
                    None
193
                }
194
            }
195
0
            _ => None,
196
        }
197
5
    }
198
199
2
    pub async fn run_sync(&self) -> Result<(), Error> {
200
        // Only run if not already in progress
201
2
        if self
  Branch (201:12): [True: 0, False: 0]
  Branch (201:12): [Folded - Ignored]
  Branch (201:12): [True: 0, False: 2]
202
2
            .sync_in_progress
203
2
            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
204
2
            .is_err()
205
        {
206
            // Sync already in progress, just return
207
0
            return Ok(());
208
2
        }
209
210
        // Use a finally block equivalent to ensure we clear the flag even if there's an error
211
2
        let result = self.sync_cache(None).await;
212
213
        // Clear the in-progress flag
214
2
        self.sync_in_progress.store(false, Ordering::SeqCst);
215
216
2
        result
217
2
    }
218
219
2
    async fn sync_cache(&self, test_client: Option<Arc<Client>>) -> Result<(), Error> {
220
2
        let client = test_client.unwrap_or_else(|| self.s3_client.clone());
221
2
        let current_time = (self.now_fn)().unix_timestamp();
222
223
        // Load existing cache to compare against
224
2
        let mut existing_digests = HashSet::new();
225
2
        if let Ok(
contents0
) = fs::read_to_string(&self.index_path).await {
  Branch (225:16): [True: 0, False: 0]
  Branch (225:16): [Folded - Ignored]
  Branch (225:16): [True: 0, False: 2]
226
0
            if let Ok(cache_file) = serde_json::from_str::<CacheFile>(&contents) {
  Branch (226:20): [True: 0, False: 0]
  Branch (226:20): [Folded - Ignored]
  Branch (226:20): [True: 0, False: 0]
227
0
                existing_digests = cache_file.digests;
228
0
            }
229
2
        }
230
231
        // Get all objects
232
2
        let objects = match self.list_objects(Some(client)).await {
233
2
            Ok(objects) => objects,
234
0
            Err(e) => {
235
0
                event!(Level::ERROR, ?e, "Failed to list objects during cache sync");
236
                // On error, keep using the existing cache
237
0
                return Err(e);
238
            }
239
        };
240
241
2
        let mut new_digests = HashSet::with_capacity(objects.len());
242
243
        // Process objects in chunks to avoid long processing times
244
2
        for chunk in objects.chunks(1000) {
245
7
            for 
obj_key5
in chunk {
246
5
                if let Some(digest) = self.parse_key_to_digest(obj_key).await {
  Branch (246:24): [True: 0, False: 0]
  Branch (246:24): [Folded - Ignored]
  Branch (246:24): [True: 5, False: 0]
247
5
                    new_digests.insert(digest);
248
5
                
}0
249
            }
250
            // Yield to other tasks periodically
251
2
            tokio::task::yield_now().await;
252
        }
253
254
        // Only write the cache file if something changed
255
2
        if new_digests != existing_digests {
  Branch (255:12): [True: 0, False: 0]
  Branch (255:12): [Folded - Ignored]
  Branch (255:12): [True: 2, False: 0]
256
2
            let cache_file = CacheFile {
257
2
                digests: new_digests.clone(),
258
2
                timestamp: current_time,
259
2
            };
260
261
2
            let json = serde_json::to_string(&cache_file)
262
2
                .map_err(|e| make_err!(
Code::Internal0
, "Failed to serialize cache: {}", e))
?0
;
263
264
2
            fs::write(&self.index_path, json)
265
2
                .await
266
2
                .map_err(|e| make_err!(
Code::Internal0
, "Failed to write cache file: {}", e))
?0
;
267
0
        }
268
269
        // Update the in-memory cache - this can be done regardless of file write status
270
2
        *self.digests.write().await = new_digests;
271
2
        self.last_sync.store(current_time, Ordering::SeqCst);
272
273
2
        Ok(())
274
2
    }
275
276
3
    pub async fn new(spec: &OntapS3ExistenceCacheSpec, now_fn: NowFn) -> Result<Arc<Self>, Error> {
277
3
        let inner_spec = &spec.backend;
278
3
        let inner_store = Arc::new(OntapS3Store::new(inner_spec, now_fn.clone()).await
?0
);
279
3
        let inner_store = Store::new((*inner_store).clone());
280
3
        let s3_client = create_s3_client(inner_spec).await
?0
;
281
282
3
        let digests = Arc::new(RwLock::new(HashSet::new()));
283
3
        let last_sync = Arc::new(AtomicU64::new(0));
284
3
        let sync_in_progress = Arc::new(AtomicBool::new(false));
285
286
        // Create the cache instance
287
3
        let cache = Arc::new(Self {
288
3
            inner_store,
289
3
            s3_client: Arc::new(s3_client),
290
3
            bucket: inner_spec.bucket.clone(),
291
3
            key_prefix: inner_spec.common.key_prefix.clone(),
292
3
            index_path: spec.index_path.clone(),
293
3
            digests: digests.clone(),
294
3
            sync_interval_seconds: spec.sync_interval_seconds,
295
3
            last_sync: last_sync.clone(),
296
3
            now_fn,
297
3
            sync_in_progress,
298
3
            cache_hits: CounterWithTime::default(),
299
3
            cache_misses: CounterWithTime::default(),
300
3
        });
301
302
        // Try to load existing cache file
303
3
        if let Ok(
contents0
) = fs::read_to_string(&spec.index_path).await {
  Branch (303:16): [True: 0, False: 3]
  Branch (303:16): [Folded - Ignored]
304
0
            let cache_file = serde_json::from_str::<CacheFile>(&contents)
305
0
                .map_err(|e| make_err!(Code::Internal, "Failed to parse cache file: {}", e))?;
306
0
            *cache.digests.write().await = cache_file.digests;
307
0
            cache
308
0
                .last_sync
309
0
                .store(cache_file.timestamp, Ordering::SeqCst);
310
3
        }
311
312
        // Start the background sync task
313
3
        let cache_clone = cache.clone();
314
3
        let _handle = spawn!("ontap_s3_existence_cache_sync", async move 
{0
315
            // Wait a bit before starting the sync to allow the service to initialize
316
0
            sleep(Duration::from_secs(5)).await;
317
318
            // Run an initial sync if the cache is empty
319
0
            if cache_clone.digests.read().await.is_empty() {
  Branch (319:16): [True: 0, False: 0]
  Branch (319:16): [Folded - Ignored]
320
0
                event!(Level::INFO, "Running initial cache sync");
321
0
                if let Err(e) = cache_clone.run_sync().await {
  Branch (321:24): [True: 0, False: 0]
  Branch (321:24): [Folded - Ignored]
322
0
                    event!(Level::ERROR, ?e, "Initial cache sync error");
323
0
                }
324
0
            }
325
326
            // Set up periodic background sync
327
0
            let mut interval = interval(Duration::from_secs(u64::from(
328
0
                cache_clone.sync_interval_seconds,
329
            )));
330
            loop {
331
0
                interval.tick().await;
332
0
                event!(Level::INFO, "Starting periodic cache sync");
333
0
                if let Err(e) = cache_clone.run_sync().await {
  Branch (333:24): [True: 0, False: 0]
  Branch (333:24): [Folded - Ignored]
334
0
                    event!(Level::ERROR, ?e, "Background sync error");
335
                } else {
336
0
                    event!(Level::INFO, "Completed periodic cache sync");
337
                }
338
            }
339
        });
340
341
3
        Ok(cache)
342
3
    }
343
}
344
345
3
async fn create_s3_client(spec: &ExperimentalOntapS3Spec) -> Result<Client, Error> {
346
3
    let http_client = TlsClient::new(&spec.common);
347
3
    let credentials_provider = DefaultCredentialsChain::builder()
348
3
        .configure(
349
3
            ProviderConfig::without_region()
350
3
                .with_region(Some(Region::new(Cow::Owned(spec.vserver_name.clone()))))
351
3
                .with_http_client(http_client.clone()),
352
3
        )
353
3
        .build()
354
3
        .await;
355
356
3
    let config = aws_sdk_s3::Config::builder()
357
3
        .credentials_provider(credentials_provider)
358
3
        .endpoint_url(&spec.endpoint)
359
3
        .region(Region::new(spec.vserver_name.clone()))
360
3
        .force_path_style(true)
361
3
        .behavior_version(BehaviorVersion::v2025_01_17())
362
3
        .build();
363
364
3
    Ok(Client::from_conf(config))
365
3
}
366
367
#[async_trait]
368
impl<I, NowFn> StoreDriver for OntapS3ExistenceCache<I, NowFn>
369
where
370
    I: InstantWrapper,
371
    NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static,
372
{
373
    async fn has_with_results(
374
        self: Pin<&Self>,
375
        keys: &[StoreKey<'_>],
376
        results: &mut [Option<u64>],
377
10
    ) -> Result<(), Error> {
378
5
        let cache = self.digests.read().await;
379
380
7
        for (key, result) in 
keys5
.
iter5
().
zip5
(
results5
.
iter_mut5
()) {
381
            // Handle zero digest case
382
7
            if is_zero_digest(key.borrow()) {
  Branch (382:16): [True: 1, False: 1]
  Branch (382:16): [Folded - Ignored]
  Branch (382:16): [True: 0, False: 5]
383
1
                *result = Some(0);
384
1
                self.cache_hits.inc();
385
1
                continue;
386
6
            }
387
388
6
            let digest = key.borrow().into_digest();
389
390
            // If not in cache, definitely doesn't exist
391
6
            if !cache.contains(&digest) {
  Branch (391:16): [True: 1, False: 0]
  Branch (391:16): [Folded - Ignored]
  Branch (391:16): [True: 0, False: 5]
392
1
                *result = None;
393
1
                self.cache_misses.inc();
394
1
                continue;
395
5
            }
396
            // If in cache, check actual store
397
5
            match self.inner_store.has(key.borrow()).await {
398
5
                Ok(size) => {
399
5
                    *result = size;
400
5
                    self.cache_hits.inc();
401
5
                }
402
0
                Err(e) => {
403
0
                    return Err(e);
404
                }
405
            }
406
        }
407
408
5
        Ok(())
409
10
    }
410
411
    async fn get_part(
412
        self: Pin<&Self>,
413
        key: StoreKey<'_>,
414
        writer: &mut DropCloserWriteHalf,
415
        offset: u64,
416
        length: Option<u64>,
417
4
    ) -> Result<(), Error> {
418
2
        if is_zero_digest(key.borrow()) {
  Branch (418:12): [True: 1, False: 1]
  Branch (418:12): [Folded - Ignored]
  Branch (418:12): [True: 0, False: 0]
419
1
            writer.send_eof().err_tip(|| "Failed to send zero EOF")
?0
;
420
1
            return Ok(());
421
1
        }
422
423
        // Only pass through if in cache
424
1
        if !self
  Branch (424:12): [True: 1, False: 0]
  Branch (424:12): [Folded - Ignored]
  Branch (424:12): [True: 0, False: 0]
425
1
            .digests
426
1
            .read()
427
1
            .await
428
1
            .contains(&key.borrow().into_digest())
429
        {
430
1
            return Err(make_err!(Code::NotFound, "Object not in cache"));
431
0
        }
432
433
0
        self.inner_store.get_part(key, writer, offset, length).await
434
4
    }
435
436
    async fn update(
437
        self: Pin<&Self>,
438
        key: StoreKey<'_>,
439
        reader: DropCloserReadHalf,
440
        size_info: UploadSizeInfo,
441
0
    ) -> Result<(), Error> {
442
0
        let key_owned = key.into_owned();
443
0
        let result = self
444
0
            .inner_store
445
0
            .update(key_owned.clone(), reader, size_info)
446
0
            .await;
447
0
        if result.is_ok() {
  Branch (447:12): [True: 0, False: 0]
  Branch (447:12): [Folded - Ignored]
  Branch (447:12): [True: 0, False: 0]
448
0
            self.digests.write().await.insert(key_owned.into_digest());
449
0
        }
450
0
        result
451
0
    }
452
453
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
454
0
        self
455
0
    }
456
457
0
    fn as_any(&self) -> &(dyn core::any::Any + Send + Sync + 'static) {
458
0
        self
459
0
    }
460
461
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Send + Sync + 'static> {
462
0
        self
463
0
    }
464
}
465
466
#[async_trait]
467
impl<I, NowFn> HealthStatusIndicator for OntapS3ExistenceCache<I, NowFn>
468
where
469
    I: InstantWrapper,
470
    NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static,
471
{
472
0
    fn get_name(&self) -> &'static str {
473
0
        "OntapS3ExistenceCache"
474
0
    }
475
476
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
477
0
        StoreDriver::check_health(Pin::new(self), namespace).await
478
0
    }
479
}