/build/source/nativelink-store/src/ontap_s3_existence_cache_store.rs
Line | Count | Source |
1 | | // Copyright 2025 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::fmt::Debug; |
16 | | use core::pin::Pin; |
17 | | use core::sync::atomic::{AtomicBool, AtomicU64, Ordering}; |
18 | | use core::time::Duration; |
19 | | use std::borrow::Cow; |
20 | | use std::collections::HashSet; |
21 | | use std::sync::{Arc, Weak}; |
22 | | |
23 | | use async_trait::async_trait; |
24 | | use aws_config::BehaviorVersion; |
25 | | use aws_config::default_provider::credentials::DefaultCredentialsChain; |
26 | | use aws_config::provider_config::ProviderConfig; |
27 | | use aws_sdk_s3::Client; |
28 | | use aws_sdk_s3::config::Region; |
29 | | use nativelink_config::stores::{ExperimentalOntapS3Spec, OntapS3ExistenceCacheSpec}; |
30 | | use nativelink_error::{Code, Error, ResultExt, make_err}; |
31 | | use nativelink_metric::MetricsComponent; |
32 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
33 | | use nativelink_util::common::DigestInfo; |
34 | | use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator}; |
35 | | use nativelink_util::instant_wrapper::InstantWrapper; |
36 | | use nativelink_util::metrics_utils::CounterWithTime; |
37 | | use nativelink_util::spawn; |
38 | | use nativelink_util::store_trait::{ |
39 | | RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo, |
40 | | }; |
41 | | use serde::{Deserialize, Serialize}; |
42 | | use tokio::fs; |
43 | | use tokio::sync::RwLock; |
44 | | use tokio::time::{interval, sleep}; |
45 | | use tracing::{Level, debug, event}; |
46 | | |
47 | | use crate::cas_utils::is_zero_digest; |
48 | | use crate::common_s3_utils::TlsClient; |
49 | | use crate::ontap_s3_store::OntapS3Store; |
50 | | |
51 | | #[derive(Serialize, Deserialize)] |
52 | | struct CacheFile { |
53 | | digests: HashSet<DigestInfo>, |
54 | | timestamp: u64, |
55 | | } |
56 | | |
57 | | #[derive(MetricsComponent)] |
58 | | pub struct OntapS3ExistenceCache<I, NowFn> |
59 | | where |
60 | | I: InstantWrapper, |
61 | | NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static, |
62 | | { |
63 | | inner_store: Store, |
64 | | s3_client: Arc<Client>, |
65 | | now_fn: NowFn, |
66 | | bucket: String, |
67 | | key_prefix: Option<String>, |
68 | | index_path: String, |
69 | | digests: Arc<RwLock<HashSet<DigestInfo>>>, |
70 | | sync_interval_seconds: u32, |
71 | | last_sync: Arc<AtomicU64>, |
72 | | sync_in_progress: Arc<AtomicBool>, |
73 | | |
74 | | #[metric(help = "Number of cache hits")] |
75 | | cache_hits: CounterWithTime, |
76 | | #[metric(help = "Number of cache misses")] |
77 | | cache_misses: CounterWithTime, |
78 | | } |
79 | | |
80 | | struct OntapS3CacheCallback<I, NowFn> |
81 | | where |
82 | | I: InstantWrapper, |
83 | | NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static, |
84 | | { |
85 | | cache: Weak<OntapS3ExistenceCache<I, NowFn>>, |
86 | | } |
87 | | |
88 | | impl<I, NowFn> Debug for OntapS3CacheCallback<I, NowFn> |
89 | | where |
90 | | I: InstantWrapper, |
91 | | NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static, |
92 | | { |
93 | 0 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
94 | 0 | f.debug_struct("OntapS3CacheCallback") |
95 | 0 | .field("cache", &self.cache) |
96 | 0 | .finish() |
97 | 0 | } |
98 | | } |
99 | | |
100 | | impl<I, NowFn> RemoveItemCallback for OntapS3CacheCallback<I, NowFn> |
101 | | where |
102 | | I: InstantWrapper, |
103 | | NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static, |
104 | | { |
105 | 0 | fn callback<'a>( |
106 | 0 | &'a self, |
107 | 0 | store_key: StoreKey<'a>, |
108 | 0 | ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> { |
109 | 0 | let cache = self.cache.upgrade(); |
110 | 0 | if let Some(local_cache) = cache { Branch (110:16): [True: 0, False: 0]
Branch (110:16): [Folded - Ignored]
|
111 | 0 | Box::pin(async move { |
112 | 0 | local_cache.callback(store_key).await; |
113 | 0 | }) |
114 | | } else { |
115 | 0 | debug!("Cache dropped, so not doing callback"); |
116 | 0 | Box::pin(async {}) |
117 | | } |
118 | 0 | } |
119 | | } |
120 | | |
121 | | impl<I, NowFn> Debug for OntapS3ExistenceCache<I, NowFn> |
122 | | where |
123 | | I: InstantWrapper, |
124 | | NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static, |
125 | | { |
126 | 0 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
127 | 0 | f.debug_struct("OntapS3ExistenceCache") |
128 | 0 | .field("inner_store", &self.inner_store) |
129 | 0 | .field("s3_client", &self.s3_client) |
130 | 0 | .field("bucket", &self.bucket) |
131 | 0 | .field("key_prefix", &self.key_prefix) |
132 | 0 | .field("index_path", &self.index_path) |
133 | 0 | .field("digests", &self.digests) |
134 | 0 | .field("sync_interval_seconds", &self.sync_interval_seconds) |
135 | 0 | .field("last_sync", &self.last_sync) |
136 | 0 | .field("sync_in_progress", &self.sync_in_progress) |
137 | 0 | .field("cache_hits", &self.cache_hits) |
138 | 0 | .field("cache_misses", &self.cache_misses) |
139 | 0 | .finish() |
140 | 0 | } |
141 | | } |
142 | | |
143 | | impl<I, NowFn> OntapS3ExistenceCache<I, NowFn> |
144 | | where |
145 | | I: InstantWrapper, |
146 | | NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static, |
147 | | { |
148 | 2 | pub fn new_for_testing( |
149 | 2 | inner_store: Store, |
150 | 2 | s3_client: Arc<Client>, |
151 | 2 | cache_file_path: String, |
152 | 2 | digests: HashSet<DigestInfo>, |
153 | 2 | sync_interval_seconds: u32, |
154 | 2 | now_fn: NowFn, |
155 | 2 | ) -> Arc<Self> { |
156 | 2 | Arc::new(Self { |
157 | 2 | inner_store, |
158 | 2 | s3_client, |
159 | 2 | bucket: "test-bucket".to_string(), |
160 | 2 | key_prefix: None, |
161 | 2 | index_path: cache_file_path, |
162 | 2 | digests: Arc::new(RwLock::new(digests)), |
163 | 2 | sync_interval_seconds, |
164 | 2 | last_sync: Arc::new(AtomicU64::new( |
165 | 2 | std::time::SystemTime::now() |
166 | 2 | .duration_since(std::time::UNIX_EPOCH) |
167 | 2 | .unwrap_or_default() |
168 | 2 | .as_secs(), |
169 | 2 | )), |
170 | 2 | sync_in_progress: Arc::new(AtomicBool::new(false)), |
171 | 2 | now_fn, |
172 | 2 | cache_hits: CounterWithTime::default(), |
173 | 2 | cache_misses: CounterWithTime::default(), |
174 | 2 | }) |
175 | 2 | } |
176 | | |
177 | 2 | async fn list_objects(&self, test_client: Option<Arc<Client>>) -> Result<Vec<String>, Error> { |
178 | 2 | let client = test_client.unwrap_or_else(|| self.s3_client0 .clone0 ()); |
179 | | |
180 | 2 | let mut objects = Vec::new(); |
181 | 2 | let mut continuation_token = None; |
182 | | |
183 | | loop { |
184 | 2 | let mut request = client |
185 | 2 | .list_objects_v2() |
186 | 2 | .bucket(&self.bucket) |
187 | 2 | .prefix(self.key_prefix.as_deref().unwrap_or("")) |
188 | 2 | .max_keys(1000); // Process in smaller batches for better responsiveness |
189 | | |
190 | 2 | if let Some(token0 ) = &continuation_token { Branch (190:20): [True: 0, False: 0]
Branch (190:20): [Folded - Ignored]
Branch (190:20): [True: 0, False: 2]
|
191 | 0 | request = request.continuation_token(token); |
192 | 2 | } |
193 | | |
194 | | // Add a timeout to avoid hanging indefinitely |
195 | 2 | let result = match tokio::time::timeout(Duration::from_secs(30), request.send()).await { |
196 | 2 | Ok(Ok(response)) => response, |
197 | 0 | Ok(Err(e)) => { |
198 | | // Log the error but continue |
199 | 0 | event!(Level::ERROR, ?e, "Error listing objects in ONTAP S3"); |
200 | 0 | return Err(make_err!(Code::Internal, "Failed to list objects: {}", e)); |
201 | | } |
202 | | Err(_) => { |
203 | | // Timeout occurred |
204 | 0 | event!(Level::ERROR, "Timeout listing objects in ONTAP S3"); |
205 | 0 | return Err(make_err!(Code::Internal, "Timeout listing objects")); |
206 | | } |
207 | | }; |
208 | | |
209 | 2 | if let Some(contents) = result.contents { Branch (209:20): [True: 0, False: 0]
Branch (209:20): [Folded - Ignored]
Branch (209:20): [True: 2, False: 0]
|
210 | 7 | for object5 in contents { |
211 | 5 | if let Some(key) = object.key { Branch (211:28): [True: 0, False: 0]
Branch (211:28): [Folded - Ignored]
Branch (211:28): [True: 5, False: 0]
|
212 | 5 | objects.push(key); |
213 | 5 | }0 |
214 | | } |
215 | 0 | } |
216 | | |
217 | 2 | match result.next_continuation_token { |
218 | 0 | Some(token) => { |
219 | 0 | continuation_token = Some(token); |
220 | | // Yield to other tasks periodically to avoid monopolizing the runtime |
221 | 0 | tokio::task::yield_now().await; |
222 | | } |
223 | 2 | None => break, |
224 | | } |
225 | | } |
226 | 2 | event!(Level::INFO, "Listed {} objects from S3", objects.len()); |
227 | 2 | event!(Level::INFO, "Used prefix: {:?}", self.key_prefix); |
228 | 2 | Ok(objects) |
229 | 2 | } |
230 | | |
231 | 5 | async fn parse_key_to_digest(&self, key: &str) -> Option<DigestInfo> { |
232 | | // Remove the prefix if it exists |
233 | 5 | let relative_key = key |
234 | 5 | .strip_prefix(self.key_prefix.as_deref().unwrap_or("")) |
235 | 5 | .unwrap_or(key) |
236 | 5 | .trim_start_matches('/'); |
237 | | |
238 | | // Skip the directory entry itself |
239 | 5 | if relative_key.is_empty() { Branch (239:12): [True: 0, False: 0]
Branch (239:12): [Folded - Ignored]
Branch (239:12): [True: 0, False: 5]
|
240 | 0 | return None; |
241 | 5 | } |
242 | | |
243 | 5 | let parts: Vec<&str> = relative_key.split('-').collect(); |
244 | | |
245 | 5 | if parts.len() != 2 { Branch (245:12): [True: 0, False: 0]
Branch (245:12): [Folded - Ignored]
Branch (245:12): [True: 0, False: 5]
|
246 | 0 | return None; |
247 | 5 | } |
248 | | |
249 | 5 | match (hex::decode(parts[0]), parts[1].parse::<u64>()) { |
250 | 5 | (Ok(hash), Ok(size)) => { |
251 | 5 | if hash.len() == 32 { Branch (251:20): [True: 0, False: 0]
Branch (251:20): [Folded - Ignored]
Branch (251:20): [True: 5, False: 0]
|
252 | | // Clone the hash before converting |
253 | 5 | let hash_array: [u8; 32] = hash.try_into().unwrap(); |
254 | 5 | let digest = DigestInfo::new(hash_array, size); |
255 | | |
256 | 5 | Some(digest) |
257 | | } else { |
258 | 0 | None |
259 | | } |
260 | | } |
261 | 0 | _ => None, |
262 | | } |
263 | 5 | } |
264 | | |
265 | 2 | pub async fn run_sync(&self) -> Result<(), Error> { |
266 | | // Only run if not already in progress |
267 | 2 | if self Branch (267:12): [True: 0, False: 0]
Branch (267:12): [Folded - Ignored]
Branch (267:12): [True: 0, False: 2]
|
268 | 2 | .sync_in_progress |
269 | 2 | .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) |
270 | 2 | .is_err() |
271 | | { |
272 | | // Sync already in progress, just return |
273 | 0 | return Ok(()); |
274 | 2 | } |
275 | | |
276 | | // Use a finally block equivalent to ensure we clear the flag even if there's an error |
277 | 2 | let result = self.sync_cache(None).await; |
278 | | |
279 | | // Clear the in-progress flag |
280 | 2 | self.sync_in_progress.store(false, Ordering::SeqCst); |
281 | | |
282 | 2 | result |
283 | 2 | } |
284 | | |
285 | 2 | async fn sync_cache(&self, test_client: Option<Arc<Client>>) -> Result<(), Error> { |
286 | 2 | let client = test_client.unwrap_or_else(|| self.s3_client.clone()); |
287 | 2 | let current_time = (self.now_fn)().unix_timestamp(); |
288 | | |
289 | | // Load existing cache to compare against |
290 | 2 | let mut existing_digests = HashSet::new(); |
291 | 2 | if let Ok(contents0 ) = fs::read_to_string(&self.index_path).await { Branch (291:16): [True: 0, False: 0]
Branch (291:16): [Folded - Ignored]
Branch (291:16): [True: 0, False: 2]
|
292 | 0 | if let Ok(cache_file) = serde_json::from_str::<CacheFile>(&contents) { Branch (292:20): [True: 0, False: 0]
Branch (292:20): [Folded - Ignored]
Branch (292:20): [True: 0, False: 0]
|
293 | 0 | existing_digests = cache_file.digests; |
294 | 0 | } |
295 | 2 | } |
296 | | |
297 | | // Get all objects |
298 | 2 | let objects = match self.list_objects(Some(client)).await { |
299 | 2 | Ok(objects) => objects, |
300 | 0 | Err(e) => { |
301 | 0 | event!(Level::ERROR, ?e, "Failed to list objects during cache sync"); |
302 | | // On error, keep using the existing cache |
303 | 0 | return Err(e); |
304 | | } |
305 | | }; |
306 | | |
307 | 2 | let mut new_digests = HashSet::with_capacity(objects.len()); |
308 | | |
309 | | // Process objects in chunks to avoid long processing times |
310 | 2 | for chunk in objects.chunks(1000) { |
311 | 7 | for obj_key5 in chunk { |
312 | 5 | if let Some(digest) = self.parse_key_to_digest(obj_key).await { Branch (312:24): [True: 0, False: 0]
Branch (312:24): [Folded - Ignored]
Branch (312:24): [True: 5, False: 0]
|
313 | 5 | new_digests.insert(digest); |
314 | 5 | }0 |
315 | | } |
316 | | // Yield to other tasks periodically |
317 | 2 | tokio::task::yield_now().await; |
318 | | } |
319 | | |
320 | | // Only write the cache file if something changed |
321 | 2 | if new_digests != existing_digests { Branch (321:12): [True: 0, False: 0]
Branch (321:12): [Folded - Ignored]
Branch (321:12): [True: 2, False: 0]
|
322 | 2 | let cache_file = CacheFile { |
323 | 2 | digests: new_digests.clone(), |
324 | 2 | timestamp: current_time, |
325 | 2 | }; |
326 | | |
327 | 2 | let json = serde_json::to_string(&cache_file) |
328 | 2 | .map_err(|e| make_err!(Code::Internal0 , "Failed to serialize cache: {}", e))?0 ; |
329 | | |
330 | 2 | fs::write(&self.index_path, json) |
331 | 2 | .await |
332 | 2 | .map_err(|e| make_err!(Code::Internal0 , "Failed to write cache file: {}", e))?0 ; |
333 | 0 | } |
334 | | |
335 | | // Update the in-memory cache - this can be done regardless of file write status |
336 | 2 | *self.digests.write().await = new_digests; |
337 | 2 | self.last_sync.store(current_time, Ordering::SeqCst); |
338 | | |
339 | 2 | Ok(()) |
340 | 2 | } |
341 | | |
342 | 3 | pub async fn new(spec: &OntapS3ExistenceCacheSpec, now_fn: NowFn) -> Result<Arc<Self>, Error> { |
343 | 3 | let inner_spec = &spec.backend; |
344 | 3 | let inner_store = Arc::new(OntapS3Store::new(inner_spec, now_fn.clone()).await?0 ); |
345 | 3 | let inner_store = Store::new((*inner_store).clone()); |
346 | 3 | let s3_client = create_s3_client(inner_spec).await?0 ; |
347 | | |
348 | 3 | let digests = Arc::new(RwLock::new(HashSet::new())); |
349 | 3 | let last_sync = Arc::new(AtomicU64::new(0)); |
350 | 3 | let sync_in_progress = Arc::new(AtomicBool::new(false)); |
351 | | |
352 | | // Create the cache instance |
353 | 3 | let cache = Arc::new(Self { |
354 | 3 | inner_store, |
355 | 3 | s3_client: Arc::new(s3_client), |
356 | 3 | bucket: inner_spec.bucket.clone(), |
357 | 3 | key_prefix: inner_spec.common.key_prefix.clone(), |
358 | 3 | index_path: spec.index_path.clone(), |
359 | 3 | digests: digests.clone(), |
360 | 3 | sync_interval_seconds: spec.sync_interval_seconds, |
361 | 3 | last_sync: last_sync.clone(), |
362 | 3 | now_fn, |
363 | 3 | sync_in_progress, |
364 | 3 | cache_hits: CounterWithTime::default(), |
365 | 3 | cache_misses: CounterWithTime::default(), |
366 | 3 | }); |
367 | | |
368 | 3 | let other_ref = Arc::downgrade(&cache); |
369 | 3 | cache |
370 | 3 | .inner_store |
371 | 3 | .register_remove_callback(Arc::new(OntapS3CacheCallback { cache: other_ref }))?0 ; |
372 | | |
373 | | // Try to load existing cache file |
374 | 3 | if let Ok(contents0 ) = fs::read_to_string(&spec.index_path).await { Branch (374:16): [True: 0, False: 3]
Branch (374:16): [Folded - Ignored]
|
375 | 0 | let cache_file = serde_json::from_str::<CacheFile>(&contents) |
376 | 0 | .map_err(|e| make_err!(Code::Internal, "Failed to parse cache file: {}", e))?; |
377 | 0 | *cache.digests.write().await = cache_file.digests; |
378 | 0 | cache |
379 | 0 | .last_sync |
380 | 0 | .store(cache_file.timestamp, Ordering::SeqCst); |
381 | 3 | } |
382 | | |
383 | | // Start the background sync task |
384 | 3 | let cache_clone = cache.clone(); |
385 | 3 | let _handle = spawn!("ontap_s3_existence_cache_sync", async move {0 |
386 | | // Wait a bit before starting the sync to allow the service to initialize |
387 | 0 | sleep(Duration::from_secs(5)).await; |
388 | | |
389 | | // Run an initial sync if the cache is empty |
390 | 0 | if cache_clone.digests.read().await.is_empty() { Branch (390:16): [True: 0, False: 0]
Branch (390:16): [Folded - Ignored]
|
391 | 0 | event!(Level::INFO, "Running initial cache sync"); |
392 | 0 | if let Err(e) = cache_clone.run_sync().await { Branch (392:24): [True: 0, False: 0]
Branch (392:24): [Folded - Ignored]
|
393 | 0 | event!(Level::ERROR, ?e, "Initial cache sync error"); |
394 | 0 | } |
395 | 0 | } |
396 | | |
397 | | // Set up periodic background sync |
398 | 0 | let mut interval = interval(Duration::from_secs(u64::from( |
399 | 0 | cache_clone.sync_interval_seconds, |
400 | | ))); |
401 | | loop { |
402 | 0 | interval.tick().await; |
403 | 0 | event!(Level::INFO, "Starting periodic cache sync"); |
404 | 0 | if let Err(e) = cache_clone.run_sync().await { Branch (404:24): [True: 0, False: 0]
Branch (404:24): [Folded - Ignored]
|
405 | 0 | event!(Level::ERROR, ?e, "Background sync error"); |
406 | | } else { |
407 | 0 | event!(Level::INFO, "Completed periodic cache sync"); |
408 | | } |
409 | | } |
410 | | }); |
411 | | |
412 | 3 | Ok(cache) |
413 | 3 | } |
414 | | } |
415 | | |
416 | 3 | async fn create_s3_client(spec: &ExperimentalOntapS3Spec) -> Result<Client, Error> { |
417 | 3 | let http_client = TlsClient::new(&spec.common); |
418 | 3 | let credentials_provider = DefaultCredentialsChain::builder() |
419 | 3 | .configure( |
420 | 3 | ProviderConfig::without_region() |
421 | 3 | .with_region(Some(Region::new(Cow::Owned(spec.vserver_name.clone())))) |
422 | 3 | .with_http_client(http_client.clone()), |
423 | 3 | ) |
424 | 3 | .build() |
425 | 3 | .await; |
426 | | |
427 | 3 | let config = aws_sdk_s3::Config::builder() |
428 | 3 | .credentials_provider(credentials_provider) |
429 | 3 | .endpoint_url(&spec.endpoint) |
430 | 3 | .region(Region::new(spec.vserver_name.clone())) |
431 | 3 | .force_path_style(true) |
432 | 3 | .behavior_version(BehaviorVersion::v2025_08_07()) |
433 | 3 | .build(); |
434 | | |
435 | 3 | Ok(Client::from_conf(config)) |
436 | 3 | } |
437 | | |
438 | | #[async_trait] |
439 | | impl<I, NowFn> StoreDriver for OntapS3ExistenceCache<I, NowFn> |
440 | | where |
441 | | I: InstantWrapper, |
442 | | NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static, |
443 | | { |
444 | | async fn has_with_results( |
445 | | self: Pin<&Self>, |
446 | | keys: &[StoreKey<'_>], |
447 | | results: &mut [Option<u64>], |
448 | 5 | ) -> Result<(), Error> { |
449 | | let cache = self.digests.read().await; |
450 | | |
451 | | for (key, result) in keys.iter().zip(results.iter_mut()) { |
452 | | // Handle zero digest case |
453 | | if is_zero_digest(key.borrow()) { |
454 | | *result = Some(0); |
455 | | self.cache_hits.inc(); |
456 | | continue; |
457 | | } |
458 | | |
459 | | let digest = key.borrow().into_digest(); |
460 | | |
461 | | // If not in cache, definitely doesn't exist |
462 | | if !cache.contains(&digest) { |
463 | | *result = None; |
464 | | self.cache_misses.inc(); |
465 | | continue; |
466 | | } |
467 | | // If in cache, check actual store |
468 | | match self.inner_store.has(key.borrow()).await { |
469 | | Ok(size) => { |
470 | | *result = size; |
471 | | self.cache_hits.inc(); |
472 | | } |
473 | | Err(e) => { |
474 | | return Err(e); |
475 | | } |
476 | | } |
477 | | } |
478 | | |
479 | | Ok(()) |
480 | 5 | } |
481 | | |
482 | | async fn get_part( |
483 | | self: Pin<&Self>, |
484 | | key: StoreKey<'_>, |
485 | | writer: &mut DropCloserWriteHalf, |
486 | | offset: u64, |
487 | | length: Option<u64>, |
488 | 2 | ) -> Result<(), Error> { |
489 | | if is_zero_digest(key.borrow()) { |
490 | | writer.send_eof().err_tip(|| "Failed to send zero EOF")?; |
491 | | return Ok(()); |
492 | | } |
493 | | |
494 | | // Only pass through if in cache |
495 | | if !self |
496 | | .digests |
497 | | .read() |
498 | | .await |
499 | | .contains(&key.borrow().into_digest()) |
500 | | { |
501 | | return Err(make_err!(Code::NotFound, "Object not in cache")); |
502 | | } |
503 | | |
504 | | self.inner_store.get_part(key, writer, offset, length).await |
505 | 2 | } |
506 | | |
507 | | async fn update( |
508 | | self: Pin<&Self>, |
509 | | key: StoreKey<'_>, |
510 | | reader: DropCloserReadHalf, |
511 | | size_info: UploadSizeInfo, |
512 | 0 | ) -> Result<(), Error> { |
513 | | let key_owned = key.into_owned(); |
514 | | let result = self |
515 | | .inner_store |
516 | | .update(key_owned.clone(), reader, size_info) |
517 | | .await; |
518 | | if result.is_ok() { |
519 | | self.digests.write().await.insert(key_owned.into_digest()); |
520 | | } |
521 | | result |
522 | 0 | } |
523 | | |
524 | 0 | fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { |
525 | 0 | self |
526 | 0 | } |
527 | | |
528 | 0 | fn as_any(&self) -> &(dyn core::any::Any + Send + Sync + 'static) { |
529 | 0 | self |
530 | 0 | } |
531 | | |
532 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Send + Sync + 'static> { |
533 | 0 | self |
534 | 0 | } |
535 | | |
536 | 0 | fn register_remove_callback( |
537 | 0 | self: Arc<Self>, |
538 | 0 | callback: Arc<dyn RemoveItemCallback>, |
539 | 0 | ) -> Result<(), Error> { |
540 | 0 | self.inner_store.register_remove_callback(callback) |
541 | 0 | } |
542 | | } |
543 | | |
544 | | impl<I, NowFn> RemoveItemCallback for OntapS3ExistenceCache<I, NowFn> |
545 | | where |
546 | | I: InstantWrapper, |
547 | | NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static, |
548 | | { |
549 | 0 | fn callback<'a>( |
550 | 0 | &'a self, |
551 | 0 | store_key: StoreKey<'a>, |
552 | 0 | ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> { |
553 | 0 | let digest = store_key.borrow().into_digest(); |
554 | 0 | Box::pin(async move { |
555 | 0 | self.digests.write().await.remove(&digest); |
556 | 0 | }) |
557 | 0 | } |
558 | | } |
559 | | |
560 | | #[async_trait] |
561 | | impl<I, NowFn> HealthStatusIndicator for OntapS3ExistenceCache<I, NowFn> |
562 | | where |
563 | | I: InstantWrapper, |
564 | | NowFn: Fn() -> I + Send + Sync + Unpin + Clone + 'static, |
565 | | { |
566 | 0 | fn get_name(&self) -> &'static str { |
567 | 0 | "OntapS3ExistenceCache" |
568 | 0 | } |
569 | | |
570 | 0 | async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { |
571 | | StoreDriver::check_health(Pin::new(self), namespace).await |
572 | 0 | } |
573 | | } |