Coverage Report

Created: 2025-10-30 00:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/ontap_s3_existence_cache_store.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::Debug;
16
use core::pin::Pin;
17
use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18
use core::time::Duration;
19
use std::borrow::Cow;
20
use std::collections::HashSet;
21
use std::sync::{Arc, Weak};
22
23
use async_trait::async_trait;
24
use aws_config::BehaviorVersion;
25
use aws_config::default_provider::credentials::DefaultCredentialsChain;
26
use aws_config::provider_config::ProviderConfig;
27
use aws_sdk_s3::Client;
28
use aws_sdk_s3::config::Region;
29
use nativelink_config::stores::{ExperimentalOntapS3Spec, OntapS3ExistenceCacheSpec};
30
use nativelink_error::{Code, Error, ResultExt, make_err};
31
use nativelink_metric::MetricsComponent;
32
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
33
use nativelink_util::common::DigestInfo;
34
use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator};
35
use nativelink_util::instant_wrapper::InstantWrapper;
36
use nativelink_util::metrics_utils::CounterWithTime;
37
use nativelink_util::spawn;
38
use nativelink_util::store_trait::{
39
    RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo,
40
};
41
use serde::{Deserialize, Serialize};
42
use tokio::fs;
43
use tokio::sync::RwLock;
44
use tokio::time::{interval, sleep};
45
use tracing::{Level, debug, event};
46
47
use crate::cas_utils::is_zero_digest;
48
use crate::common_s3_utils::TlsClient;
49
use crate::ontap_s3_store::OntapS3Store;
50
51
#[derive(Serialize, Deserialize)]
52
struct CacheFile {
53
    digests: HashSet<DigestInfo>,
54
    timestamp: u64,
55
}
56
57
#[derive(MetricsComponent)]
58
pub struct OntapS3ExistenceCache<I, NowFn>
59
where
60
    I: InstantWrapper,
61
    NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static,
62
{
63
    inner_store: Store,
64
    s3_client: Arc<Client>,
65
    now_fn: NowFn,
66
    bucket: String,
67
    key_prefix: Option<String>,
68
    index_path: String,
69
    digests: Arc<RwLock<HashSet<DigestInfo>>>,
70
    sync_interval_seconds: u32,
71
    last_sync: Arc<AtomicU64>,
72
    sync_in_progress: Arc<AtomicBool>,
73
74
    #[metric(help = "Number of cache hits")]
75
    cache_hits: CounterWithTime,
76
    #[metric(help = "Number of cache misses")]
77
    cache_misses: CounterWithTime,
78
}
79
80
struct OntapS3CacheCallback<I, NowFn>
81
where
82
    I: InstantWrapper,
83
    NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static,
84
{
85
    cache: Weak<OntapS3ExistenceCache<I, NowFn>>,
86
}
87
88
impl<I, NowFn> Debug for OntapS3CacheCallback<I, NowFn>
89
where
90
    I: InstantWrapper,
91
    NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static,
92
{
93
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
94
0
        f.debug_struct("OntapS3CacheCallback")
95
0
            .field("cache", &self.cache)
96
0
            .finish()
97
0
    }
98
}
99
100
impl<I, NowFn> RemoveItemCallback for OntapS3CacheCallback<I, NowFn>
101
where
102
    I: InstantWrapper,
103
    NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static,
104
{
105
0
    fn callback<'a>(
106
0
        &'a self,
107
0
        store_key: StoreKey<'a>,
108
0
    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
109
0
        let cache = self.cache.upgrade();
110
0
        if let Some(local_cache) = cache {
  Branch (110:16): [True: 0, False: 0]
  Branch (110:16): [Folded - Ignored]
111
0
            Box::pin(async move {
112
0
                local_cache.callback(store_key).await;
113
0
            })
114
        } else {
115
0
            debug!("Cache dropped, so not doing callback");
116
0
            Box::pin(async {})
117
        }
118
0
    }
119
}
120
121
impl<I, NowFn> Debug for OntapS3ExistenceCache<I, NowFn>
122
where
123
    I: InstantWrapper,
124
    NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static,
125
{
126
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
127
0
        f.debug_struct("OntapS3ExistenceCache")
128
0
            .field("inner_store", &self.inner_store)
129
0
            .field("s3_client", &self.s3_client)
130
0
            .field("bucket", &self.bucket)
131
0
            .field("key_prefix", &self.key_prefix)
132
0
            .field("index_path", &self.index_path)
133
0
            .field("digests", &self.digests)
134
0
            .field("sync_interval_seconds", &self.sync_interval_seconds)
135
0
            .field("last_sync", &self.last_sync)
136
0
            .field("sync_in_progress", &self.sync_in_progress)
137
0
            .field("cache_hits", &self.cache_hits)
138
0
            .field("cache_misses", &self.cache_misses)
139
0
            .finish()
140
0
    }
141
}
142
143
impl<I, NowFn> OntapS3ExistenceCache<I, NowFn>
144
where
145
    I: InstantWrapper,
146
    NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static,
147
{
148
2
    pub fn new_for_testing(
149
2
        inner_store: Store,
150
2
        s3_client: Arc<Client>,
151
2
        cache_file_path: String,
152
2
        digests: HashSet<DigestInfo>,
153
2
        sync_interval_seconds: u32,
154
2
        now_fn: NowFn,
155
2
    ) -> Arc<Self> {
156
2
        Arc::new(Self {
157
2
            inner_store,
158
2
            s3_client,
159
2
            bucket: "test-bucket".to_string(),
160
2
            key_prefix: None,
161
2
            index_path: cache_file_path,
162
2
            digests: Arc::new(RwLock::new(digests)),
163
2
            sync_interval_seconds,
164
2
            last_sync: Arc::new(AtomicU64::new(
165
2
                std::time::SystemTime::now()
166
2
                    .duration_since(std::time::UNIX_EPOCH)
167
2
                    .unwrap_or_default()
168
2
                    .as_secs(),
169
2
            )),
170
2
            sync_in_progress: Arc::new(AtomicBool::new(false)),
171
2
            now_fn,
172
2
            cache_hits: CounterWithTime::default(),
173
2
            cache_misses: CounterWithTime::default(),
174
2
        })
175
2
    }
176
177
2
    async fn list_objects(&self, test_client: Option<Arc<Client>>) -> Result<Vec<String>, Error> {
178
2
        let client = test_client.unwrap_or_else(|| 
self.s3_client0
.
clone0
());
179
180
2
        let mut objects = Vec::new();
181
2
        let mut continuation_token = None;
182
183
        loop {
184
2
            let mut request = client
185
2
                .list_objects_v2()
186
2
                .bucket(&self.bucket)
187
2
                .prefix(self.key_prefix.as_deref().unwrap_or(""))
188
2
                .max_keys(1000); // Process in smaller batches for better responsiveness
189
190
2
            if let Some(
token0
) = &continuation_token {
  Branch (190:20): [True: 0, False: 0]
  Branch (190:20): [Folded - Ignored]
  Branch (190:20): [True: 0, False: 2]
191
0
                request = request.continuation_token(token);
192
2
            }
193
194
            // Add a timeout to avoid hanging indefinitely
195
2
            let result = match tokio::time::timeout(Duration::from_secs(30), request.send()).await {
196
2
                Ok(Ok(response)) => response,
197
0
                Ok(Err(e)) => {
198
                    // Log the error but continue
199
0
                    event!(Level::ERROR, ?e, "Error listing objects in ONTAP S3");
200
0
                    return Err(make_err!(Code::Internal, "Failed to list objects: {}", e));
201
                }
202
                Err(_) => {
203
                    // Timeout occurred
204
0
                    event!(Level::ERROR, "Timeout listing objects in ONTAP S3");
205
0
                    return Err(make_err!(Code::Internal, "Timeout listing objects"));
206
                }
207
            };
208
209
2
            if let Some(contents) = result.contents {
  Branch (209:20): [True: 0, False: 0]
  Branch (209:20): [Folded - Ignored]
  Branch (209:20): [True: 2, False: 0]
210
7
                for 
object5
in contents {
211
5
                    if let Some(key) = object.key {
  Branch (211:28): [True: 0, False: 0]
  Branch (211:28): [Folded - Ignored]
  Branch (211:28): [True: 5, False: 0]
212
5
                        objects.push(key);
213
5
                    
}0
214
                }
215
0
            }
216
217
2
            match result.next_continuation_token {
218
0
                Some(token) => {
219
0
                    continuation_token = Some(token);
220
                    // Yield to other tasks periodically to avoid monopolizing the runtime
221
0
                    tokio::task::yield_now().await;
222
                }
223
2
                None => break,
224
            }
225
        }
226
2
        event!(Level::INFO, "Listed {} objects from S3", objects.len());
227
2
        event!(Level::INFO, "Used prefix: {:?}", self.key_prefix);
228
2
        Ok(objects)
229
2
    }
230
231
5
    async fn parse_key_to_digest(&self, key: &str) -> Option<DigestInfo> {
232
        // Remove the prefix if it exists
233
5
        let relative_key = key
234
5
            .strip_prefix(self.key_prefix.as_deref().unwrap_or(""))
235
5
            .unwrap_or(key)
236
5
            .trim_start_matches('/');
237
238
        // Skip the directory entry itself
239
5
        if relative_key.is_empty() {
  Branch (239:12): [True: 0, False: 0]
  Branch (239:12): [Folded - Ignored]
  Branch (239:12): [True: 0, False: 5]
240
0
            return None;
241
5
        }
242
243
5
        let parts: Vec<&str> = relative_key.split('-').collect();
244
245
5
        if parts.len() != 2 {
  Branch (245:12): [True: 0, False: 0]
  Branch (245:12): [Folded - Ignored]
  Branch (245:12): [True: 0, False: 5]
246
0
            return None;
247
5
        }
248
249
5
        match (hex::decode(parts[0]), parts[1].parse::<u64>()) {
250
5
            (Ok(hash), Ok(size)) => {
251
5
                if hash.len() == 32 {
  Branch (251:20): [True: 0, False: 0]
  Branch (251:20): [Folded - Ignored]
  Branch (251:20): [True: 5, False: 0]
252
                    // Clone the hash before converting
253
5
                    let hash_array: [u8; 32] = hash.try_into().unwrap();
254
5
                    let digest = DigestInfo::new(hash_array, size);
255
256
5
                    Some(digest)
257
                } else {
258
0
                    None
259
                }
260
            }
261
0
            _ => None,
262
        }
263
5
    }
264
265
2
    pub async fn run_sync(&self) -> Result<(), Error> {
266
        // Only run if not already in progress
267
2
        if self
  Branch (267:12): [True: 0, False: 0]
  Branch (267:12): [Folded - Ignored]
  Branch (267:12): [True: 0, False: 2]
268
2
            .sync_in_progress
269
2
            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
270
2
            .is_err()
271
        {
272
            // Sync already in progress, just return
273
0
            return Ok(());
274
2
        }
275
276
        // Use a finally block equivalent to ensure we clear the flag even if there's an error
277
2
        let result = self.sync_cache(None).await;
278
279
        // Clear the in-progress flag
280
2
        self.sync_in_progress.store(false, Ordering::SeqCst);
281
282
2
        result
283
2
    }
284
285
2
    async fn sync_cache(&self, test_client: Option<Arc<Client>>) -> Result<(), Error> {
286
2
        let client = test_client.unwrap_or_else(|| self.s3_client.clone());
287
2
        let current_time = (self.now_fn)().unix_timestamp();
288
289
        // Load existing cache to compare against
290
2
        let mut existing_digests = HashSet::new();
291
2
        if let Ok(
contents0
) = fs::read_to_string(&self.index_path).await {
  Branch (291:16): [True: 0, False: 0]
  Branch (291:16): [Folded - Ignored]
  Branch (291:16): [True: 0, False: 2]
292
0
            if let Ok(cache_file) = serde_json::from_str::<CacheFile>(&contents) {
  Branch (292:20): [True: 0, False: 0]
  Branch (292:20): [Folded - Ignored]
  Branch (292:20): [True: 0, False: 0]
293
0
                existing_digests = cache_file.digests;
294
0
            }
295
2
        }
296
297
        // Get all objects
298
2
        let objects = match self.list_objects(Some(client)).await {
299
2
            Ok(objects) => objects,
300
0
            Err(e) => {
301
0
                event!(Level::ERROR, ?e, "Failed to list objects during cache sync");
302
                // On error, keep using the existing cache
303
0
                return Err(e);
304
            }
305
        };
306
307
2
        let mut new_digests = HashSet::with_capacity(objects.len());
308
309
        // Process objects in chunks to avoid long processing times
310
2
        for chunk in objects.chunks(1000) {
311
7
            for 
obj_key5
in chunk {
312
5
                if let Some(digest) = self.parse_key_to_digest(obj_key).await {
  Branch (312:24): [True: 0, False: 0]
  Branch (312:24): [Folded - Ignored]
  Branch (312:24): [True: 5, False: 0]
313
5
                    new_digests.insert(digest);
314
5
                
}0
315
            }
316
            // Yield to other tasks periodically
317
2
            tokio::task::yield_now().await;
318
        }
319
320
        // Only write the cache file if something changed
321
2
        if new_digests != existing_digests {
  Branch (321:12): [True: 0, False: 0]
  Branch (321:12): [Folded - Ignored]
  Branch (321:12): [True: 2, False: 0]
322
2
            let cache_file = CacheFile {
323
2
                digests: new_digests.clone(),
324
2
                timestamp: current_time,
325
2
            };
326
327
2
            let json = serde_json::to_string(&cache_file)
328
2
                .map_err(|e| make_err!(
Code::Internal0
, "Failed to serialize cache: {}", e))
?0
;
329
330
2
            fs::write(&self.index_path, json)
331
2
                .await
332
2
                .map_err(|e| make_err!(
Code::Internal0
, "Failed to write cache file: {}", e))
?0
;
333
0
        }
334
335
        // Update the in-memory cache - this can be done regardless of file write status
336
2
        *self.digests.write().await = new_digests;
337
2
        self.last_sync.store(current_time, Ordering::SeqCst);
338
339
2
        Ok(())
340
2
    }
341
342
3
    pub async fn new(spec: &OntapS3ExistenceCacheSpec, now_fn: NowFn) -> Result<Arc<Self>, Error> {
343
3
        let inner_spec = &spec.backend;
344
3
        let inner_store = Arc::new(OntapS3Store::new(inner_spec, now_fn.clone()).await
?0
);
345
3
        let inner_store = Store::new((*inner_store).clone());
346
3
        let s3_client = create_s3_client(inner_spec).await
?0
;
347
348
3
        let digests = Arc::new(RwLock::new(HashSet::new()));
349
3
        let last_sync = Arc::new(AtomicU64::new(0));
350
3
        let sync_in_progress = Arc::new(AtomicBool::new(false));
351
352
        // Create the cache instance
353
3
        let cache = Arc::new(Self {
354
3
            inner_store,
355
3
            s3_client: Arc::new(s3_client),
356
3
            bucket: inner_spec.bucket.clone(),
357
3
            key_prefix: inner_spec.common.key_prefix.clone(),
358
3
            index_path: spec.index_path.clone(),
359
3
            digests: digests.clone(),
360
3
            sync_interval_seconds: spec.sync_interval_seconds,
361
3
            last_sync: last_sync.clone(),
362
3
            now_fn,
363
3
            sync_in_progress,
364
3
            cache_hits: CounterWithTime::default(),
365
3
            cache_misses: CounterWithTime::default(),
366
3
        });
367
368
3
        let other_ref = Arc::downgrade(&cache);
369
3
        cache
370
3
            .inner_store
371
3
            .register_remove_callback(Arc::new(OntapS3CacheCallback { cache: other_ref }))
?0
;
372
373
        // Try to load existing cache file
374
3
        if let Ok(
contents0
) = fs::read_to_string(&spec.index_path).await {
  Branch (374:16): [True: 0, False: 3]
  Branch (374:16): [Folded - Ignored]
375
0
            let cache_file = serde_json::from_str::<CacheFile>(&contents)
376
0
                .map_err(|e| make_err!(Code::Internal, "Failed to parse cache file: {}", e))?;
377
0
            *cache.digests.write().await = cache_file.digests;
378
0
            cache
379
0
                .last_sync
380
0
                .store(cache_file.timestamp, Ordering::SeqCst);
381
3
        }
382
383
        // Start the background sync task
384
3
        let cache_clone = cache.clone();
385
3
        let _handle = spawn!("ontap_s3_existence_cache_sync", async move 
{0
386
            // Wait a bit before starting the sync to allow the service to initialize
387
0
            sleep(Duration::from_secs(5)).await;
388
389
            // Run an initial sync if the cache is empty
390
0
            if cache_clone.digests.read().await.is_empty() {
  Branch (390:16): [True: 0, False: 0]
  Branch (390:16): [Folded - Ignored]
391
0
                event!(Level::INFO, "Running initial cache sync");
392
0
                if let Err(e) = cache_clone.run_sync().await {
  Branch (392:24): [True: 0, False: 0]
  Branch (392:24): [Folded - Ignored]
393
0
                    event!(Level::ERROR, ?e, "Initial cache sync error");
394
0
                }
395
0
            }
396
397
            // Set up periodic background sync
398
0
            let mut interval = interval(Duration::from_secs(u64::from(
399
0
                cache_clone.sync_interval_seconds,
400
            )));
401
            loop {
402
0
                interval.tick().await;
403
0
                event!(Level::INFO, "Starting periodic cache sync");
404
0
                if let Err(e) = cache_clone.run_sync().await {
  Branch (404:24): [True: 0, False: 0]
  Branch (404:24): [Folded - Ignored]
405
0
                    event!(Level::ERROR, ?e, "Background sync error");
406
                } else {
407
0
                    event!(Level::INFO, "Completed periodic cache sync");
408
                }
409
            }
410
        });
411
412
3
        Ok(cache)
413
3
    }
414
}
415
416
3
async fn create_s3_client(spec: &ExperimentalOntapS3Spec) -> Result<Client, Error> {
417
3
    let http_client = TlsClient::new(&spec.common);
418
3
    let credentials_provider = DefaultCredentialsChain::builder()
419
3
        .configure(
420
3
            ProviderConfig::without_region()
421
3
                .with_region(Some(Region::new(Cow::Owned(spec.vserver_name.clone()))))
422
3
                .with_http_client(http_client.clone()),
423
3
        )
424
3
        .build()
425
3
        .await;
426
427
3
    let config = aws_sdk_s3::Config::builder()
428
3
        .credentials_provider(credentials_provider)
429
3
        .endpoint_url(&spec.endpoint)
430
3
        .region(Region::new(spec.vserver_name.clone()))
431
3
        .force_path_style(true)
432
3
        .behavior_version(BehaviorVersion::v2025_08_07())
433
3
        .build();
434
435
3
    Ok(Client::from_conf(config))
436
3
}
437
438
#[async_trait]
439
impl<I, NowFn> StoreDriver for OntapS3ExistenceCache<I, NowFn>
440
where
441
    I: InstantWrapper,
442
    NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static,
443
{
444
    async fn has_with_results(
445
        self: Pin<&Self>,
446
        keys: &[StoreKey<'_>],
447
        results: &mut [Option<u64>],
448
5
    ) -> Result<(), Error> {
449
        let cache = self.digests.read().await;
450
451
        for (key, result) in keys.iter().zip(results.iter_mut()) {
452
            // Handle zero digest case
453
            if is_zero_digest(key.borrow()) {
454
                *result = Some(0);
455
                self.cache_hits.inc();
456
                continue;
457
            }
458
459
            let digest = key.borrow().into_digest();
460
461
            // If not in cache, definitely doesn't exist
462
            if !cache.contains(&digest) {
463
                *result = None;
464
                self.cache_misses.inc();
465
                continue;
466
            }
467
            // If in cache, check actual store
468
            match self.inner_store.has(key.borrow()).await {
469
                Ok(size) => {
470
                    *result = size;
471
                    self.cache_hits.inc();
472
                }
473
                Err(e) => {
474
                    return Err(e);
475
                }
476
            }
477
        }
478
479
        Ok(())
480
5
    }
481
482
    async fn get_part(
483
        self: Pin<&Self>,
484
        key: StoreKey<'_>,
485
        writer: &mut DropCloserWriteHalf,
486
        offset: u64,
487
        length: Option<u64>,
488
2
    ) -> Result<(), Error> {
489
        if is_zero_digest(key.borrow()) {
490
            writer.send_eof().err_tip(|| "Failed to send zero EOF")?;
491
            return Ok(());
492
        }
493
494
        // Only pass through if in cache
495
        if !self
496
            .digests
497
            .read()
498
            .await
499
            .contains(&key.borrow().into_digest())
500
        {
501
            return Err(make_err!(Code::NotFound, "Object not in cache"));
502
        }
503
504
        self.inner_store.get_part(key, writer, offset, length).await
505
2
    }
506
507
    async fn update(
508
        self: Pin<&Self>,
509
        key: StoreKey<'_>,
510
        reader: DropCloserReadHalf,
511
        size_info: UploadSizeInfo,
512
0
    ) -> Result<(), Error> {
513
        let key_owned = key.into_owned();
514
        let result = self
515
            .inner_store
516
            .update(key_owned.clone(), reader, size_info)
517
            .await;
518
        if result.is_ok() {
519
            self.digests.write().await.insert(key_owned.into_digest());
520
        }
521
        result
522
0
    }
523
524
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
525
0
        self
526
0
    }
527
528
0
    fn as_any(&self) -> &(dyn core::any::Any + Send + Sync + 'static) {
529
0
        self
530
0
    }
531
532
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Send + Sync + 'static> {
533
0
        self
534
0
    }
535
536
0
    fn register_remove_callback(
537
0
        self: Arc<Self>,
538
0
        callback: Arc<dyn RemoveItemCallback>,
539
0
    ) -> Result<(), Error> {
540
0
        self.inner_store.register_remove_callback(callback)
541
0
    }
542
}
543
544
impl<I, NowFn> RemoveItemCallback for OntapS3ExistenceCache<I, NowFn>
545
where
546
    I: InstantWrapper,
547
    NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static,
548
{
549
0
    fn callback<'a>(
550
0
        &'a self,
551
0
        store_key: StoreKey<'a>,
552
0
    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
553
0
        let digest = store_key.borrow().into_digest();
554
0
        Box::pin(async move {
555
0
            self.digests.write().await.remove(&digest);
556
0
        })
557
0
    }
558
}
559
560
#[async_trait]
561
impl<I, NowFn> HealthStatusIndicator for OntapS3ExistenceCache<I, NowFn>
562
where
563
    I: InstantWrapper,
564
    NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static,
565
{
566
0
    fn get_name(&self) -> &'static str {
567
0
        "OntapS3ExistenceCache"
568
0
    }
569
570
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
571
        StoreDriver::check_health(Pin::new(self), namespace).await
572
0
    }
573
}