/build/source/nativelink-store/src/ontap_s3_existence_cache_store.rs
Line | Count | Source |
1 | | // Copyright 2025 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::pin::Pin; |
16 | | use core::sync::atomic::{AtomicBool, AtomicU64, Ordering}; |
17 | | use core::time::Duration; |
18 | | use std::borrow::Cow; |
19 | | use std::collections::HashSet; |
20 | | use std::sync::Arc; |
21 | | |
22 | | use async_trait::async_trait; |
23 | | use aws_config::BehaviorVersion; |
24 | | use aws_config::default_provider::credentials::DefaultCredentialsChain; |
25 | | use aws_config::provider_config::ProviderConfig; |
26 | | use aws_sdk_s3::Client; |
27 | | use aws_sdk_s3::config::Region; |
28 | | use nativelink_config::stores::{ExperimentalOntapS3Spec, OntapS3ExistenceCacheSpec}; |
29 | | use nativelink_error::{Code, Error, ResultExt, make_err}; |
30 | | use nativelink_metric::MetricsComponent; |
31 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
32 | | use nativelink_util::common::DigestInfo; |
33 | | use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator}; |
34 | | use nativelink_util::instant_wrapper::InstantWrapper; |
35 | | use nativelink_util::metrics_utils::CounterWithTime; |
36 | | use nativelink_util::spawn; |
37 | | use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo}; |
38 | | use serde::{Deserialize, Serialize}; |
39 | | use tokio::fs; |
40 | | use tokio::sync::RwLock; |
41 | | use tokio::time::{interval, sleep}; |
42 | | use tracing::{Level, event}; |
43 | | |
44 | | use crate::cas_utils::is_zero_digest; |
45 | | use crate::common_s3_utils::TlsClient; |
46 | | use crate::ontap_s3_store::OntapS3Store; |
47 | | |
48 | | #[derive(Serialize, Deserialize)] |
49 | | struct CacheFile { |
50 | | digests: HashSet<DigestInfo>, |
51 | | timestamp: u64, |
52 | | } |
53 | | |
54 | | #[derive(Debug, MetricsComponent)] |
55 | | pub struct OntapS3ExistenceCache<I, NowFn> |
56 | | where |
57 | | I: InstantWrapper, |
58 | | NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static, |
59 | | { |
60 | | inner_store: Store, |
61 | | s3_client: Arc<Client>, |
62 | | now_fn: NowFn, |
63 | | bucket: String, |
64 | | key_prefix: Option<String>, |
65 | | index_path: String, |
66 | | digests: Arc<RwLock<HashSet<DigestInfo>>>, |
67 | | sync_interval_seconds: u32, |
68 | | last_sync: Arc<AtomicU64>, |
69 | | sync_in_progress: Arc<AtomicBool>, |
70 | | |
71 | | #[metric(help = "Number of cache hits")] |
72 | | cache_hits: CounterWithTime, |
73 | | #[metric(help = "Number of cache misses")] |
74 | | cache_misses: CounterWithTime, |
75 | | } |
76 | | |
77 | | impl<I, NowFn> OntapS3ExistenceCache<I, NowFn> |
78 | | where |
79 | | I: InstantWrapper, |
80 | | NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static, |
81 | | { |
82 | 2 | pub fn new_for_testing( |
83 | 2 | inner_store: Store, |
84 | 2 | s3_client: Arc<Client>, |
85 | 2 | cache_file_path: String, |
86 | 2 | digests: HashSet<DigestInfo>, |
87 | 2 | sync_interval_seconds: u32, |
88 | 2 | now_fn: NowFn, |
89 | 2 | ) -> Arc<Self> { |
90 | 2 | Arc::new(Self { |
91 | 2 | inner_store, |
92 | 2 | s3_client, |
93 | 2 | bucket: "test-bucket".to_string(), |
94 | 2 | key_prefix: None, |
95 | 2 | index_path: cache_file_path, |
96 | 2 | digests: Arc::new(RwLock::new(digests)), |
97 | 2 | sync_interval_seconds, |
98 | 2 | last_sync: Arc::new(AtomicU64::new( |
99 | 2 | std::time::SystemTime::now() |
100 | 2 | .duration_since(std::time::UNIX_EPOCH) |
101 | 2 | .unwrap_or_default() |
102 | 2 | .as_secs(), |
103 | 2 | )), |
104 | 2 | sync_in_progress: Arc::new(AtomicBool::new(false)), |
105 | 2 | now_fn, |
106 | 2 | cache_hits: CounterWithTime::default(), |
107 | 2 | cache_misses: CounterWithTime::default(), |
108 | 2 | }) |
109 | 2 | } |
110 | | |
111 | 2 | async fn list_objects(&self, test_client: Option<Arc<Client>>) -> Result<Vec<String>, Error> { |
112 | 2 | let client = test_client.unwrap_or_else(|| self.s3_client0 .clone0 ()); |
113 | | |
114 | 2 | let mut objects = Vec::new(); |
115 | 2 | let mut continuation_token = None; |
116 | | |
117 | | loop { |
118 | 2 | let mut request = client |
119 | 2 | .list_objects_v2() |
120 | 2 | .bucket(&self.bucket) |
121 | 2 | .prefix(self.key_prefix.as_deref().unwrap_or("")) |
122 | 2 | .max_keys(1000); // Process in smaller batches for better responsiveness |
123 | | |
124 | 2 | if let Some(token0 ) = &continuation_token { Branch (124:20): [True: 0, False: 0]
Branch (124:20): [Folded - Ignored]
Branch (124:20): [True: 0, False: 2]
|
125 | 0 | request = request.continuation_token(token); |
126 | 2 | } |
127 | | |
128 | | // Add a timeout to avoid hanging indefinitely |
129 | 2 | let result = match tokio::time::timeout(Duration::from_secs(30), request.send()).await { |
130 | 2 | Ok(Ok(response)) => response, |
131 | 0 | Ok(Err(e)) => { |
132 | | // Log the error but continue |
133 | 0 | event!(Level::ERROR, ?e, "Error listing objects in ONTAP S3"); |
134 | 0 | return Err(make_err!(Code::Internal, "Failed to list objects: {}", e)); |
135 | | } |
136 | | Err(_) => { |
137 | | // Timeout occurred |
138 | 0 | event!(Level::ERROR, "Timeout listing objects in ONTAP S3"); |
139 | 0 | return Err(make_err!(Code::Internal, "Timeout listing objects")); |
140 | | } |
141 | | }; |
142 | | |
143 | 2 | if let Some(contents) = result.contents { Branch (143:20): [True: 0, False: 0]
Branch (143:20): [Folded - Ignored]
Branch (143:20): [True: 2, False: 0]
|
144 | 7 | for object5 in contents { |
145 | 5 | if let Some(key) = object.key { Branch (145:28): [True: 0, False: 0]
Branch (145:28): [Folded - Ignored]
Branch (145:28): [True: 5, False: 0]
|
146 | 5 | objects.push(key); |
147 | 5 | }0 |
148 | | } |
149 | 0 | } |
150 | | |
151 | 2 | match result.next_continuation_token { |
152 | 0 | Some(token) => { |
153 | 0 | continuation_token = Some(token); |
154 | | // Yield to other tasks periodically to avoid monopolizing the runtime |
155 | 0 | tokio::task::yield_now().await; |
156 | | } |
157 | 2 | None => break, |
158 | | } |
159 | | } |
160 | 2 | event!(Level::INFO, "Listed {} objects from S3", objects.len()); |
161 | 2 | event!(Level::INFO, "Used prefix: {:?}", self.key_prefix); |
162 | 2 | Ok(objects) |
163 | 2 | } |
164 | | |
165 | 5 | async fn parse_key_to_digest(&self, key: &str) -> Option<DigestInfo> { |
166 | | // Remove the prefix if it exists |
167 | 5 | let relative_key = key |
168 | 5 | .strip_prefix(self.key_prefix.as_deref().unwrap_or("")) |
169 | 5 | .unwrap_or(key) |
170 | 5 | .trim_start_matches('/'); |
171 | | |
172 | | // Skip the directory entry itself |
173 | 5 | if relative_key.is_empty() { Branch (173:12): [True: 0, False: 0]
Branch (173:12): [Folded - Ignored]
Branch (173:12): [True: 0, False: 5]
|
174 | 0 | return None; |
175 | 5 | } |
176 | | |
177 | 5 | let parts: Vec<&str> = relative_key.split('-').collect(); |
178 | | |
179 | 5 | if parts.len() != 2 { Branch (179:12): [True: 0, False: 0]
Branch (179:12): [Folded - Ignored]
Branch (179:12): [True: 0, False: 5]
|
180 | 0 | return None; |
181 | 5 | } |
182 | | |
183 | 5 | match (hex::decode(parts[0]), parts[1].parse::<u64>()) { |
184 | 5 | (Ok(hash), Ok(size)) => { |
185 | 5 | if hash.len() == 32 { Branch (185:20): [True: 0, False: 0]
Branch (185:20): [Folded - Ignored]
Branch (185:20): [True: 5, False: 0]
|
186 | | // Clone the hash before converting |
187 | 5 | let hash_array: [u8; 32] = hash.try_into().unwrap(); |
188 | 5 | let digest = DigestInfo::new(hash_array, size); |
189 | | |
190 | 5 | Some(digest) |
191 | | } else { |
192 | 0 | None |
193 | | } |
194 | | } |
195 | 0 | _ => None, |
196 | | } |
197 | 5 | } |
198 | | |
199 | 2 | pub async fn run_sync(&self) -> Result<(), Error> { |
200 | | // Only run if not already in progress |
201 | 2 | if self Branch (201:12): [True: 0, False: 0]
Branch (201:12): [Folded - Ignored]
Branch (201:12): [True: 0, False: 2]
|
202 | 2 | .sync_in_progress |
203 | 2 | .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) |
204 | 2 | .is_err() |
205 | | { |
206 | | // Sync already in progress, just return |
207 | 0 | return Ok(()); |
208 | 2 | } |
209 | | |
210 | | // Use a finally block equivalent to ensure we clear the flag even if there's an error |
211 | 2 | let result = self.sync_cache(None).await; |
212 | | |
213 | | // Clear the in-progress flag |
214 | 2 | self.sync_in_progress.store(false, Ordering::SeqCst); |
215 | | |
216 | 2 | result |
217 | 2 | } |
218 | | |
219 | 2 | async fn sync_cache(&self, test_client: Option<Arc<Client>>) -> Result<(), Error> { |
220 | 2 | let client = test_client.unwrap_or_else(|| self.s3_client.clone()); |
221 | 2 | let current_time = (self.now_fn)().unix_timestamp(); |
222 | | |
223 | | // Load existing cache to compare against |
224 | 2 | let mut existing_digests = HashSet::new(); |
225 | 2 | if let Ok(contents0 ) = fs::read_to_string(&self.index_path).await { Branch (225:16): [True: 0, False: 0]
Branch (225:16): [Folded - Ignored]
Branch (225:16): [True: 0, False: 2]
|
226 | 0 | if let Ok(cache_file) = serde_json::from_str::<CacheFile>(&contents) { Branch (226:20): [True: 0, False: 0]
Branch (226:20): [Folded - Ignored]
Branch (226:20): [True: 0, False: 0]
|
227 | 0 | existing_digests = cache_file.digests; |
228 | 0 | } |
229 | 2 | } |
230 | | |
231 | | // Get all objects |
232 | 2 | let objects = match self.list_objects(Some(client)).await { |
233 | 2 | Ok(objects) => objects, |
234 | 0 | Err(e) => { |
235 | 0 | event!(Level::ERROR, ?e, "Failed to list objects during cache sync"); |
236 | | // On error, keep using the existing cache |
237 | 0 | return Err(e); |
238 | | } |
239 | | }; |
240 | | |
241 | 2 | let mut new_digests = HashSet::with_capacity(objects.len()); |
242 | | |
243 | | // Process objects in chunks to avoid long processing times |
244 | 2 | for chunk in objects.chunks(1000) { |
245 | 7 | for obj_key5 in chunk { |
246 | 5 | if let Some(digest) = self.parse_key_to_digest(obj_key).await { Branch (246:24): [True: 0, False: 0]
Branch (246:24): [Folded - Ignored]
Branch (246:24): [True: 5, False: 0]
|
247 | 5 | new_digests.insert(digest); |
248 | 5 | }0 |
249 | | } |
250 | | // Yield to other tasks periodically |
251 | 2 | tokio::task::yield_now().await; |
252 | | } |
253 | | |
254 | | // Only write the cache file if something changed |
255 | 2 | if new_digests != existing_digests { Branch (255:12): [True: 0, False: 0]
Branch (255:12): [Folded - Ignored]
Branch (255:12): [True: 2, False: 0]
|
256 | 2 | let cache_file = CacheFile { |
257 | 2 | digests: new_digests.clone(), |
258 | 2 | timestamp: current_time, |
259 | 2 | }; |
260 | | |
261 | 2 | let json = serde_json::to_string(&cache_file) |
262 | 2 | .map_err(|e| make_err!(Code::Internal0 , "Failed to serialize cache: {}", e))?0 ; |
263 | | |
264 | 2 | fs::write(&self.index_path, json) |
265 | 2 | .await |
266 | 2 | .map_err(|e| make_err!(Code::Internal0 , "Failed to write cache file: {}", e))?0 ; |
267 | 0 | } |
268 | | |
269 | | // Update the in-memory cache - this can be done regardless of file write status |
270 | 2 | *self.digests.write().await = new_digests; |
271 | 2 | self.last_sync.store(current_time, Ordering::SeqCst); |
272 | | |
273 | 2 | Ok(()) |
274 | 2 | } |
275 | | |
276 | 3 | pub async fn new(spec: &OntapS3ExistenceCacheSpec, now_fn: NowFn) -> Result<Arc<Self>, Error> { |
277 | 3 | let inner_spec = &spec.backend; |
278 | 3 | let inner_store = Arc::new(OntapS3Store::new(inner_spec, now_fn.clone()).await?0 ); |
279 | 3 | let inner_store = Store::new((*inner_store).clone()); |
280 | 3 | let s3_client = create_s3_client(inner_spec).await?0 ; |
281 | | |
282 | 3 | let digests = Arc::new(RwLock::new(HashSet::new())); |
283 | 3 | let last_sync = Arc::new(AtomicU64::new(0)); |
284 | 3 | let sync_in_progress = Arc::new(AtomicBool::new(false)); |
285 | | |
286 | | // Create the cache instance |
287 | 3 | let cache = Arc::new(Self { |
288 | 3 | inner_store, |
289 | 3 | s3_client: Arc::new(s3_client), |
290 | 3 | bucket: inner_spec.bucket.clone(), |
291 | 3 | key_prefix: inner_spec.common.key_prefix.clone(), |
292 | 3 | index_path: spec.index_path.clone(), |
293 | 3 | digests: digests.clone(), |
294 | 3 | sync_interval_seconds: spec.sync_interval_seconds, |
295 | 3 | last_sync: last_sync.clone(), |
296 | 3 | now_fn, |
297 | 3 | sync_in_progress, |
298 | 3 | cache_hits: CounterWithTime::default(), |
299 | 3 | cache_misses: CounterWithTime::default(), |
300 | 3 | }); |
301 | | |
302 | | // Try to load existing cache file |
303 | 3 | if let Ok(contents0 ) = fs::read_to_string(&spec.index_path).await { Branch (303:16): [True: 0, False: 3]
Branch (303:16): [Folded - Ignored]
|
304 | 0 | let cache_file = serde_json::from_str::<CacheFile>(&contents) |
305 | 0 | .map_err(|e| make_err!(Code::Internal, "Failed to parse cache file: {}", e))?; |
306 | 0 | *cache.digests.write().await = cache_file.digests; |
307 | 0 | cache |
308 | 0 | .last_sync |
309 | 0 | .store(cache_file.timestamp, Ordering::SeqCst); |
310 | 3 | } |
311 | | |
312 | | // Start the background sync task |
313 | 3 | let cache_clone = cache.clone(); |
314 | 3 | let _handle = spawn!("ontap_s3_existence_cache_sync", async move {0 |
315 | | // Wait a bit before starting the sync to allow the service to initialize |
316 | 0 | sleep(Duration::from_secs(5)).await; |
317 | | |
318 | | // Run an initial sync if the cache is empty |
319 | 0 | if cache_clone.digests.read().await.is_empty() { Branch (319:16): [True: 0, False: 0]
Branch (319:16): [Folded - Ignored]
|
320 | 0 | event!(Level::INFO, "Running initial cache sync"); |
321 | 0 | if let Err(e) = cache_clone.run_sync().await { Branch (321:24): [True: 0, False: 0]
Branch (321:24): [Folded - Ignored]
|
322 | 0 | event!(Level::ERROR, ?e, "Initial cache sync error"); |
323 | 0 | } |
324 | 0 | } |
325 | | |
326 | | // Set up periodic background sync |
327 | 0 | let mut interval = interval(Duration::from_secs(u64::from( |
328 | 0 | cache_clone.sync_interval_seconds, |
329 | | ))); |
330 | | loop { |
331 | 0 | interval.tick().await; |
332 | 0 | event!(Level::INFO, "Starting periodic cache sync"); |
333 | 0 | if let Err(e) = cache_clone.run_sync().await { Branch (333:24): [True: 0, False: 0]
Branch (333:24): [Folded - Ignored]
|
334 | 0 | event!(Level::ERROR, ?e, "Background sync error"); |
335 | | } else { |
336 | 0 | event!(Level::INFO, "Completed periodic cache sync"); |
337 | | } |
338 | | } |
339 | | }); |
340 | | |
341 | 3 | Ok(cache) |
342 | 3 | } |
343 | | } |
344 | | |
345 | 3 | async fn create_s3_client(spec: &ExperimentalOntapS3Spec) -> Result<Client, Error> { |
346 | 3 | let http_client = TlsClient::new(&spec.common); |
347 | 3 | let credentials_provider = DefaultCredentialsChain::builder() |
348 | 3 | .configure( |
349 | 3 | ProviderConfig::without_region() |
350 | 3 | .with_region(Some(Region::new(Cow::Owned(spec.vserver_name.clone())))) |
351 | 3 | .with_http_client(http_client.clone()), |
352 | 3 | ) |
353 | 3 | .build() |
354 | 3 | .await; |
355 | | |
356 | 3 | let config = aws_sdk_s3::Config::builder() |
357 | 3 | .credentials_provider(credentials_provider) |
358 | 3 | .endpoint_url(&spec.endpoint) |
359 | 3 | .region(Region::new(spec.vserver_name.clone())) |
360 | 3 | .force_path_style(true) |
361 | 3 | .behavior_version(BehaviorVersion::v2025_01_17()) |
362 | 3 | .build(); |
363 | | |
364 | 3 | Ok(Client::from_conf(config)) |
365 | 3 | } |
366 | | |
367 | | #[async_trait] |
368 | | impl<I, NowFn> StoreDriver for OntapS3ExistenceCache<I, NowFn> |
369 | | where |
370 | | I: InstantWrapper, |
371 | | NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static, |
372 | | { |
373 | | async fn has_with_results( |
374 | | self: Pin<&Self>, |
375 | | keys: &[StoreKey<'_>], |
376 | | results: &mut [Option<u64>], |
377 | 10 | ) -> Result<(), Error> { |
378 | 5 | let cache = self.digests.read().await; |
379 | | |
380 | 7 | for (key, result) in keys5 .iter5 ().zip5 (results5 .iter_mut5 ()) { |
381 | | // Handle zero digest case |
382 | 7 | if is_zero_digest(key.borrow()) { Branch (382:16): [True: 1, False: 1]
Branch (382:16): [Folded - Ignored]
Branch (382:16): [True: 0, False: 5]
|
383 | 1 | *result = Some(0); |
384 | 1 | self.cache_hits.inc(); |
385 | 1 | continue; |
386 | 6 | } |
387 | | |
388 | 6 | let digest = key.borrow().into_digest(); |
389 | | |
390 | | // If not in cache, definitely doesn't exist |
391 | 6 | if !cache.contains(&digest) { Branch (391:16): [True: 1, False: 0]
Branch (391:16): [Folded - Ignored]
Branch (391:16): [True: 0, False: 5]
|
392 | 1 | *result = None; |
393 | 1 | self.cache_misses.inc(); |
394 | 1 | continue; |
395 | 5 | } |
396 | | // If in cache, check actual store |
397 | 5 | match self.inner_store.has(key.borrow()).await { |
398 | 5 | Ok(size) => { |
399 | 5 | *result = size; |
400 | 5 | self.cache_hits.inc(); |
401 | 5 | } |
402 | 0 | Err(e) => { |
403 | 0 | return Err(e); |
404 | | } |
405 | | } |
406 | | } |
407 | | |
408 | 5 | Ok(()) |
409 | 10 | } |
410 | | |
411 | | async fn get_part( |
412 | | self: Pin<&Self>, |
413 | | key: StoreKey<'_>, |
414 | | writer: &mut DropCloserWriteHalf, |
415 | | offset: u64, |
416 | | length: Option<u64>, |
417 | 4 | ) -> Result<(), Error> { |
418 | 2 | if is_zero_digest(key.borrow()) { Branch (418:12): [True: 1, False: 1]
Branch (418:12): [Folded - Ignored]
Branch (418:12): [True: 0, False: 0]
|
419 | 1 | writer.send_eof().err_tip(|| "Failed to send zero EOF")?0 ; |
420 | 1 | return Ok(()); |
421 | 1 | } |
422 | | |
423 | | // Only pass through if in cache |
424 | 1 | if !self Branch (424:12): [True: 1, False: 0]
Branch (424:12): [Folded - Ignored]
Branch (424:12): [True: 0, False: 0]
|
425 | 1 | .digests |
426 | 1 | .read() |
427 | 1 | .await |
428 | 1 | .contains(&key.borrow().into_digest()) |
429 | | { |
430 | 1 | return Err(make_err!(Code::NotFound, "Object not in cache")); |
431 | 0 | } |
432 | | |
433 | 0 | self.inner_store.get_part(key, writer, offset, length).await |
434 | 4 | } |
435 | | |
436 | | async fn update( |
437 | | self: Pin<&Self>, |
438 | | key: StoreKey<'_>, |
439 | | reader: DropCloserReadHalf, |
440 | | size_info: UploadSizeInfo, |
441 | 0 | ) -> Result<(), Error> { |
442 | 0 | let key_owned = key.into_owned(); |
443 | 0 | let result = self |
444 | 0 | .inner_store |
445 | 0 | .update(key_owned.clone(), reader, size_info) |
446 | 0 | .await; |
447 | 0 | if result.is_ok() { Branch (447:12): [True: 0, False: 0]
Branch (447:12): [Folded - Ignored]
Branch (447:12): [True: 0, False: 0]
|
448 | 0 | self.digests.write().await.insert(key_owned.into_digest()); |
449 | 0 | } |
450 | 0 | result |
451 | 0 | } |
452 | | |
453 | 0 | fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { |
454 | 0 | self |
455 | 0 | } |
456 | | |
457 | 0 | fn as_any(&self) -> &(dyn core::any::Any + Send + Sync + 'static) { |
458 | 0 | self |
459 | 0 | } |
460 | | |
461 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Send + Sync + 'static> { |
462 | 0 | self |
463 | 0 | } |
464 | | } |
465 | | |
466 | | #[async_trait] |
467 | | impl<I, NowFn> HealthStatusIndicator for OntapS3ExistenceCache<I, NowFn> |
468 | | where |
469 | | I: InstantWrapper, |
470 | | NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static, |
471 | | { |
472 | 0 | fn get_name(&self) -> &'static str { |
473 | 0 | "OntapS3ExistenceCache" |
474 | 0 | } |
475 | | |
476 | 0 | async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { |
477 | 0 | StoreDriver::check_health(Pin::new(self), namespace).await |
478 | 0 | } |
479 | | } |