/build/source/nativelink-store/src/mongo_store.rs
Line | Count | Source |
1 | | // Copyright 2025 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::cmp; |
16 | | use core::ops::Bound; |
17 | | use core::pin::Pin; |
18 | | use core::sync::atomic::{AtomicUsize, Ordering}; |
19 | | use core::time::Duration; |
20 | | use std::borrow::Cow; |
21 | | use std::sync::{Arc, Weak}; |
22 | | |
23 | | use async_trait::async_trait; |
24 | | use bytes::Bytes; |
25 | | use futures::stream::{Stream, StreamExt, TryStreamExt}; |
26 | | use mongodb::bson::{Bson, Document, doc}; |
27 | | use mongodb::options::{ClientOptions, FindOptions, IndexOptions, ReturnDocument, WriteConcern}; |
28 | | use mongodb::{Client as MongoClient, Collection, Database, IndexModel}; |
29 | | use nativelink_config::stores::ExperimentalMongoSpec; |
30 | | use nativelink_error::{Code, Error, make_err, make_input_err}; |
31 | | use nativelink_metric::MetricsComponent; |
32 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
33 | | use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; |
34 | | use nativelink_util::spawn; |
35 | | use nativelink_util::store_trait::{ |
36 | | BoolValue, RemoveItemCallback, SchedulerCurrentVersionProvider, SchedulerIndexProvider, |
37 | | SchedulerStore, SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider, |
38 | | SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo, |
39 | | }; |
40 | | use nativelink_util::task::JoinHandleDropGuard; |
41 | | use parking_lot::{Mutex, RwLock}; |
42 | | use patricia_tree::StringPatriciaMap; |
43 | | use tokio::sync::{Semaphore, SemaphorePermit, watch}; |
44 | | use tokio::time::sleep; |
45 | | use tracing::{error, info, trace, warn}; |
46 | | |
47 | | use crate::cas_utils::is_zero_digest; |
48 | | |
49 | | /// The default database name if not specified. |
50 | | const DEFAULT_DB_NAME: &str = "nativelink"; |
51 | | |
52 | | /// The default collection name if not specified. |
53 | | const DEFAULT_COLLECTION_NAME: &str = "cas"; |
54 | | |
55 | | /// The default scheduler collection name if not specified. |
56 | | const DEFAULT_SCHEDULER_COLLECTION_NAME: &str = "scheduler"; |
57 | | |
58 | | /// The default size of the read chunk when reading data from `MongoDB`. |
59 | | const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024; |
60 | | |
61 | | /// The default connection timeout in milliseconds if not specified. |
62 | | const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000; |
63 | | |
64 | | /// The default command timeout in milliseconds if not specified. |
65 | | const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000; |
66 | | |
67 | | /// The name of the field in `MongoDB` documents that stores the key. |
68 | | const KEY_FIELD: &str = "_id"; |
69 | | |
70 | | /// The name of the field in `MongoDB` documents that stores the data. |
71 | | const DATA_FIELD: &str = "data"; |
72 | | |
73 | | /// The name of the field in `MongoDB` documents that stores the version. |
74 | | const VERSION_FIELD: &str = "version"; |
75 | | |
76 | | /// The name of the field in `MongoDB` documents that stores the size. |
77 | | const SIZE_FIELD: &str = "size"; |
78 | | |
79 | | /// A [`StoreDriver`] implementation that uses `MongoDB` as a backing store. |
80 | | #[derive(Debug, MetricsComponent)] |
81 | | pub struct ExperimentalMongoStore { |
82 | | /// The `MongoDB` client. |
83 | | #[allow(dead_code)] |
84 | | client: MongoClient, |
85 | | |
86 | | /// The database to use. |
87 | | database: Database, |
88 | | |
89 | | /// The collection for CAS data. |
90 | | cas_collection: Collection<Document>, |
91 | | |
92 | | /// The collection for scheduler data. |
93 | | scheduler_collection: Collection<Document>, |
94 | | |
95 | | /// A common prefix to append to all keys before they are sent to `MongoDB`. |
96 | | #[metric(help = "Prefix to append to all keys before sending to MongoDB")] |
97 | | key_prefix: String, |
98 | | |
99 | | /// The amount of data to read from `MongoDB` at a time. |
100 | | #[metric(help = "The amount of data to read from MongoDB at a time")] |
101 | | read_chunk_size: usize, |
102 | | |
103 | | /// Enable change streams for real-time updates. |
104 | | #[metric(help = "Whether change streams are enabled")] |
105 | | enable_change_streams: bool, |
106 | | |
107 | | /// A manager for subscriptions to keys in `MongoDB`. |
108 | | subscription_manager: Mutex<Option<Arc<ExperimentalMongoSubscriptionManager>>>, |
109 | | |
110 | | /// Limits the number of requests at any one time |
111 | | request_permits: Arc<Semaphore>, |
112 | | |
113 | | /// Keep track of the `request_permits` queue size |
114 | | waiting_permits: Arc<AtomicUsize>, |
115 | | } |
116 | | |
117 | | impl ExperimentalMongoStore { |
118 | | /// Create a new `ExperimentalMongoStore` from the given configuration. |
119 | 17 | pub async fn new(mut spec: ExperimentalMongoSpec) -> Result<Arc<Self>, Error> { |
120 | | // Set defaults |
121 | 17 | if spec.connection_string.is_empty() { |
122 | 1 | return Err(make_err!( |
123 | 1 | Code::InvalidArgument, |
124 | 1 | "No connection string was specified in mongo store configuration." |
125 | 1 | )); |
126 | 16 | } |
127 | | |
128 | 16 | if spec.database.is_empty() { |
129 | 1 | spec.database = DEFAULT_DB_NAME.to_string(); |
130 | 15 | } |
131 | | |
132 | 16 | if spec.cas_collection.is_empty() { |
133 | 1 | spec.cas_collection = DEFAULT_COLLECTION_NAME.to_string(); |
134 | 15 | } |
135 | | |
136 | 16 | if spec.scheduler_collection.is_empty() { |
137 | 1 | spec.scheduler_collection = DEFAULT_SCHEDULER_COLLECTION_NAME.to_string(); |
138 | 15 | } |
139 | | |
140 | 16 | if spec.read_chunk_size == 0 { |
141 | 1 | spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE; |
142 | 15 | } |
143 | | |
144 | 16 | if spec.connection_timeout_ms == 0 { |
145 | 1 | spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS; |
146 | 15 | } |
147 | | |
148 | 16 | if spec.command_timeout_ms == 0 { |
149 | 1 | spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS; |
150 | 15 | } |
151 | | |
152 | 16 | if spec.max_concurrent_uploads != 0 { |
153 | 15 | warn!( |
154 | | "max_concurrent_uploads was set for Mongo, and it's a deprecated value we don't use anymore" |
155 | | ); |
156 | 1 | } |
157 | | |
158 | 16 | if let Some(max_permits2 ) = spec.max_requests |
159 | 2 | && max_permits == 0 |
160 | | { |
161 | 1 | return Err(make_err!( |
162 | 1 | Code::InvalidArgument, |
163 | 1 | "max_request_permits was set to zero, which will block mongo_store from working at all" |
164 | 1 | )); |
165 | 15 | } |
166 | | |
167 | | // Configure client options |
168 | 15 | let mut client_options14 = ClientOptions::parse(&spec.connection_string) |
169 | 15 | .await |
170 | 15 | .map_err(|e| {1 |
171 | 1 | make_err!( |
172 | 1 | Code::InvalidArgument, |
173 | | "Failed to parse MongoDB connection string: {e}" |
174 | | ) |
175 | 1 | })?; |
176 | | |
177 | 14 | client_options.server_selection_timeout = |
178 | 14 | Some(Duration::from_millis(spec.connection_timeout_ms)); |
179 | 14 | client_options.connect_timeout = Some(Duration::from_millis(spec.connection_timeout_ms)); |
180 | | |
181 | | // Set write concern if specified |
182 | 14 | if let Some(w13 ) = spec.write_concern_w { |
183 | | // Parse write concern w - can be a number or string like "majority" |
184 | 13 | let w_value = if let Ok(num0 ) = w.parse::<u32>() { |
185 | 0 | Some(num.into()) |
186 | | } else { |
187 | 13 | Some(w.into()) |
188 | | }; |
189 | | |
190 | | // Build write concern based on which options are set |
191 | 13 | let write_concern = match (w_value, spec.write_concern_j, spec.write_concern_timeout_ms) |
192 | | { |
193 | 13 | (Some(w), Some(j), Some(timeout)) => WriteConcern::builder() |
194 | 13 | .w(Some(w)) |
195 | 13 | .journal(j) |
196 | 13 | .w_timeout(Some(Duration::from_millis(u64::from(timeout)))) |
197 | 13 | .build(), |
198 | 0 | (Some(w), Some(j), None) => WriteConcern::builder().w(Some(w)).journal(j).build(), |
199 | 0 | (Some(w), None, Some(timeout)) => WriteConcern::builder() |
200 | 0 | .w(Some(w)) |
201 | 0 | .w_timeout(Some(Duration::from_millis(u64::from(timeout)))) |
202 | 0 | .build(), |
203 | 0 | (Some(w), None, None) => WriteConcern::builder().w(Some(w)).build(), |
204 | 0 | _ => unreachable!(), // We know w is Some because we're in the if let Some(w) block |
205 | | }; |
206 | | |
207 | 13 | client_options.write_concern = Some(write_concern); |
208 | 1 | } else if spec.write_concern_j.is_some() || spec.write_concern_timeout_ms0 .is_some0 () { |
209 | 1 | return Err(make_err!( |
210 | 1 | Code::InvalidArgument, |
211 | 1 | "write_concern_w not set, but j and/or timeout set. Please set 'write_concern_w' to a non-default value. See https://www.mongodb.com/docs/manual/reference/write-concern/#w-option for options." |
212 | 1 | )); |
213 | 0 | } |
214 | | |
215 | | // Create client |
216 | 13 | let client = MongoClient::with_options(client_options).map_err(|e| {0 |
217 | 0 | make_err!( |
218 | 0 | Code::InvalidArgument, |
219 | | "Failed to create MongoDB client: {e}" |
220 | | ) |
221 | 0 | })?; |
222 | | |
223 | | // Get database and collections |
224 | 13 | let database = client.database(&spec.database); |
225 | 13 | let cas_collection = database.collection::<Document>(&spec.cas_collection); |
226 | 13 | let scheduler_collection = database.collection::<Document>(&spec.scheduler_collection); |
227 | | |
228 | | // Create indexes |
229 | 13 | Self::create_indexes(&cas_collection, &scheduler_collection).await?0 ; |
230 | | |
231 | 13 | let store = Self { |
232 | 13 | client, |
233 | 13 | database, |
234 | 13 | cas_collection, |
235 | 13 | scheduler_collection, |
236 | 13 | key_prefix: spec.key_prefix.clone().unwrap_or_default(), |
237 | 13 | read_chunk_size: spec.read_chunk_size, |
238 | 13 | enable_change_streams: spec.enable_change_streams, |
239 | 13 | subscription_manager: Mutex::new(None), |
240 | 13 | request_permits: Arc::new(Semaphore::new( |
241 | 13 | spec.max_requests.unwrap_or(Semaphore::MAX_PERMITS), |
242 | 13 | )), |
243 | 13 | waiting_permits: Arc::new(AtomicUsize::new(0)), |
244 | 13 | }; |
245 | | |
246 | 13 | Ok(Arc::new(store)) |
247 | 17 | } |
248 | | |
249 | | /// Create necessary indexes for efficient operations. |
250 | 13 | async fn create_indexes( |
251 | 13 | cas_collection: &Collection<Document>, |
252 | 13 | _scheduler_collection: &Collection<Document>, |
253 | 13 | ) -> Result<(), Error> { |
254 | | // CAS collection indexes |
255 | 13 | cas_collection |
256 | 13 | .create_index(IndexModel::builder().keys(doc! { SIZE_FIELD: 1 }).build()) |
257 | 13 | .await |
258 | 13 | .map_err(|e| {0 |
259 | 0 | make_err!( |
260 | 0 | Code::Internal, |
261 | | "Failed to create size index on CAS collection: {e}" |
262 | | ) |
263 | 0 | })?; |
264 | | |
265 | | // Scheduler collection will have dynamic indexes created as needed |
266 | | |
267 | 13 | Ok(()) |
268 | 13 | } |
269 | | |
270 | | /// Encode a [`StoreKey`] so it can be sent to `MongoDB`. |
271 | 55 | fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> { |
272 | 55 | let key_body = key.as_str(); |
273 | 55 | if self.key_prefix.is_empty() { |
274 | 55 | key_body |
275 | | } else { |
276 | 0 | match key_body { |
277 | 0 | Cow::Owned(mut encoded_key) => { |
278 | 0 | encoded_key.insert_str(0, &self.key_prefix); |
279 | 0 | Cow::Owned(encoded_key) |
280 | | } |
281 | 0 | Cow::Borrowed(body) => { |
282 | 0 | let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len()); |
283 | 0 | encoded_key.push_str(&self.key_prefix); |
284 | 0 | encoded_key.push_str(body); |
285 | 0 | Cow::Owned(encoded_key) |
286 | | } |
287 | | } |
288 | | } |
289 | 55 | } |
290 | | |
291 | | /// Decode a key from `MongoDB` by removing the prefix. |
292 | 0 | fn decode_key(&self, key: &str) -> Option<String> { |
293 | 0 | if self.key_prefix.is_empty() { |
294 | 0 | Some(key.to_string()) |
295 | | } else { |
296 | 0 | key.strip_prefix(&self.key_prefix).map(ToString::to_string) |
297 | | } |
298 | 0 | } |
299 | | |
300 | 53 | async fn acquire_permit(&self) -> Result<SemaphorePermit<'_>, Error> { |
301 | 53 | let waiting = self.waiting_permits.fetch_add(1, Ordering::Relaxed); |
302 | | |
303 | 53 | if waiting > 0 && waiting0 .is_multiple_of0 (100) { |
304 | 0 | info!(waiting, "Number of waiting permits for Mongo"); |
305 | | } else { |
306 | 53 | trace!(waiting, "Number of waiting permits for Mongo"); |
307 | | } |
308 | 53 | let permit = self.request_permits.acquire().await; |
309 | 53 | self.waiting_permits.fetch_sub(1, Ordering::Relaxed); |
310 | 53 | Ok(permit?0 ) |
311 | 53 | } |
312 | | } |
313 | | |
314 | | #[async_trait] |
315 | | impl StoreDriver for ExperimentalMongoStore { |
316 | | async fn has_with_results( |
317 | | self: Pin<&Self>, |
318 | | keys: &[StoreKey<'_>], |
319 | | results: &mut [Option<u64>], |
320 | 18 | ) -> Result<(), Error> { |
321 | | for (key, result) in keys.iter().zip(results.iter_mut()) { |
322 | | // Handle zero digest specially |
323 | | if is_zero_digest(key.borrow()) { |
324 | | *result = Some(0); |
325 | | continue; |
326 | | } |
327 | | |
328 | | let encoded_key = self.encode_key(key); |
329 | | let filter = doc! { KEY_FIELD: encoded_key.as_ref() }; |
330 | | |
331 | | // We could do this with acquire_many, but that's unsafe if the number of keys is greater |
332 | | // than the number of permits, as it'll block forever. Doing this one at a time is guaranteed |
333 | | // not to block provided no-one sets permits to 0, and we check for that case at startup. |
334 | | let semaphore = self.acquire_permit().await?; |
335 | | |
336 | | match self.cas_collection.find_one(filter).await { |
337 | | Ok(Some(doc)) => { |
338 | 17 | *result = doc.get_i64(SIZE_FIELD).ok().map(|v| v as u64); |
339 | | } |
340 | | Ok(None) => { |
341 | | *result = None; |
342 | | } |
343 | | Err(e) => { |
344 | | return Err(make_err!( |
345 | | Code::Internal, |
346 | | "MongoDB error in has_with_results: {e}" |
347 | | )); |
348 | | } |
349 | | } |
350 | | drop(semaphore); |
351 | | } |
352 | | |
353 | | Ok(()) |
354 | 18 | } |
355 | | |
356 | | async fn list( |
357 | | self: Pin<&Self>, |
358 | | range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>), |
359 | | handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_), |
360 | 0 | ) -> Result<u64, Error> { |
361 | | let mut filter = Document::new(); |
362 | | |
363 | | // Build range query |
364 | | let mut key_filter = Document::new(); |
365 | | match &range.0 { |
366 | | Bound::Included(start) => { |
367 | | let encoded = self.encode_key(start); |
368 | | key_filter.insert("$gte", encoded.as_ref()); |
369 | | } |
370 | | Bound::Excluded(start) => { |
371 | | let encoded = self.encode_key(start); |
372 | | key_filter.insert("$gt", encoded.as_ref()); |
373 | | } |
374 | | Bound::Unbounded => {} |
375 | | } |
376 | | |
377 | | match &range.1 { |
378 | | Bound::Included(end) => { |
379 | | let encoded = self.encode_key(end); |
380 | | key_filter.insert("$lte", encoded.as_ref()); |
381 | | } |
382 | | Bound::Excluded(end) => { |
383 | | let encoded = self.encode_key(end); |
384 | | key_filter.insert("$lt", encoded.as_ref()); |
385 | | } |
386 | | Bound::Unbounded => {} |
387 | | } |
388 | | |
389 | | if !key_filter.is_empty() { |
390 | | filter.insert(KEY_FIELD, key_filter); |
391 | | } |
392 | | |
393 | | // Add prefix filter if needed |
394 | | if !self.key_prefix.is_empty() { |
395 | | let regex_filter = doc! { |
396 | | KEY_FIELD: { |
397 | | "$regex": format!("^{}", regex::escape(&self.key_prefix)), |
398 | | } |
399 | | }; |
400 | | if filter.is_empty() { |
401 | | filter = regex_filter; |
402 | | } else { |
403 | | filter = doc! { "$and": [filter, regex_filter] }; |
404 | | } |
405 | | } |
406 | | |
407 | | let semaphore = self.acquire_permit().await?; |
408 | | |
409 | | let mut cursor = self |
410 | | .cas_collection |
411 | | .find(filter) |
412 | | .projection(doc! { KEY_FIELD: 1 }) |
413 | | .await |
414 | 0 | .map_err(|e| make_err!(Code::Internal, "Failed to create cursor in list: {e}"))?; |
415 | | |
416 | | let mut count = 0u64; |
417 | | while let Some(doc) = cursor |
418 | | .try_next() |
419 | | .await |
420 | 0 | .map_err(|e| make_err!(Code::Internal, "Failed to get next document in list: {e}"))? |
421 | | { |
422 | | if let Ok(key) = doc.get_str(KEY_FIELD) |
423 | | && let Some(decoded_key) = self.decode_key(key) |
424 | | { |
425 | | let store_key = StoreKey::new_str(&decoded_key); |
426 | | count += 1; |
427 | | if !handler(&store_key) { |
428 | | break; |
429 | | } |
430 | | } |
431 | | } |
432 | | |
433 | | drop(semaphore); |
434 | | |
435 | | Ok(count) |
436 | 0 | } |
437 | | |
438 | | async fn update( |
439 | | self: Pin<&Self>, |
440 | | key: StoreKey<'_>, |
441 | | mut reader: DropCloserReadHalf, |
442 | | upload_size: UploadSizeInfo, |
443 | 20 | ) -> Result<(), Error> { |
444 | | let encoded_key = self.encode_key(&key); |
445 | | |
446 | | // Handle zero digest |
447 | | if is_zero_digest(key.borrow()) { |
448 | 0 | let chunk = reader.peek().await.map_err(|e| { |
449 | 0 | make_err!( |
450 | 0 | Code::Internal, |
451 | | "Failed to peek in ExperimentalMongoStore::update: {e}" |
452 | | ) |
453 | 0 | })?; |
454 | | if chunk.is_empty() { |
455 | 0 | reader.drain().await.map_err(|e| { |
456 | 0 | make_err!( |
457 | 0 | Code::Internal, |
458 | | "Failed to drain in ExperimentalMongoStore::update: {e}" |
459 | | ) |
460 | 0 | })?; |
461 | | return Ok(()); |
462 | | } |
463 | | } |
464 | | |
465 | | // Special handling for MaxSize(0) - this should error if stream is closed |
466 | | if upload_size == UploadSizeInfo::MaxSize(0) { |
467 | | // Try to read from the stream - if it's closed immediately, this should error |
468 | | match reader.recv().await { |
469 | | Ok(_chunk) => { |
470 | | return Err(make_input_err!( |
471 | | "Received data when MaxSize is 0 in MongoDB store" |
472 | | )); |
473 | | } |
474 | | Err(e) => { |
475 | | return Err(make_err!( |
476 | | Code::InvalidArgument, |
477 | | "Stream closed for MaxSize(0) upload: {e}" |
478 | | )); |
479 | | } |
480 | | } |
481 | | } |
482 | | |
483 | | // Read all data into memory with proper EOF handling |
484 | | let mut data = Vec::new(); |
485 | | while let Ok(chunk) = reader.recv().await { |
486 | | if chunk.is_empty() { |
487 | | break; // Empty chunk signals EOF |
488 | | } |
489 | | data.extend_from_slice(&chunk); |
490 | | } |
491 | | |
492 | | let size = data.len() as i64; |
493 | | |
494 | | // Create document |
495 | | let doc = doc! { |
496 | | KEY_FIELD: encoded_key.as_ref(), |
497 | | DATA_FIELD: Bson::Binary(mongodb::bson::Binary { |
498 | | subtype: mongodb::bson::spec::BinarySubtype::Generic, |
499 | | bytes: data, |
500 | | }), |
501 | | SIZE_FIELD: size, |
502 | | }; |
503 | | |
504 | | let semaphore = self.acquire_permit().await?; |
505 | | |
506 | | // Upsert the document |
507 | | self.cas_collection |
508 | | .update_one( |
509 | | doc! { KEY_FIELD: encoded_key.as_ref() }, |
510 | | doc! { "$set": doc }, |
511 | | ) |
512 | | .upsert(true) |
513 | | .await |
514 | 0 | .map_err(|e| make_err!(Code::Internal, "Failed to update document in MongoDB: {e}"))?; |
515 | | |
516 | | drop(semaphore); |
517 | | |
518 | | Ok(()) |
519 | 20 | } |
520 | | |
521 | | async fn get_part( |
522 | | self: Pin<&Self>, |
523 | | key: StoreKey<'_>, |
524 | | writer: &mut DropCloserWriteHalf, |
525 | | offset: u64, |
526 | | length: Option<u64>, |
527 | 18 | ) -> Result<(), Error> { |
528 | | // Handle zero digest |
529 | | if is_zero_digest(key.borrow()) { |
530 | 0 | return writer.send_eof().map_err(|e| { |
531 | 0 | make_err!( |
532 | 0 | Code::Internal, |
533 | | "Failed to send zero EOF in mongo store get_part: {e}" |
534 | | ) |
535 | 0 | }); |
536 | | } |
537 | | |
538 | | let encoded_key = self.encode_key(&key); |
539 | | let filter = doc! { KEY_FIELD: encoded_key.as_ref() }; |
540 | | |
541 | | let semaphore = self.acquire_permit().await?; |
542 | | |
543 | | let doc = self |
544 | | .cas_collection |
545 | | .find_one(filter) |
546 | | .await |
547 | 0 | .map_err(|e| make_err!(Code::Internal, "Failed to find document in get_part: {e}"))? |
548 | 1 | .ok_or_else(|| { |
549 | 1 | make_err!( |
550 | 1 | Code::NotFound, |
551 | | "Data not found in MongoDB store for digest: {key:?}" |
552 | | ) |
553 | 1 | })?; |
554 | | |
555 | | let data = match doc.get(DATA_FIELD) { |
556 | | Some(Bson::Binary(binary)) => &binary.bytes, |
557 | | _ => { |
558 | | return Err(make_err!( |
559 | | Code::Internal, |
560 | | "Invalid data field in MongoDB document" |
561 | | )); |
562 | | } |
563 | | }; |
564 | | |
565 | | let offset = usize::try_from(offset).unwrap_or(usize::MAX); |
566 | | let data_len = data.len(); |
567 | | |
568 | | if offset > data_len { |
569 | 0 | return writer.send_eof().map_err(|e| { |
570 | 0 | make_err!( |
571 | 0 | Code::Internal, |
572 | | "Failed to send EOF in mongo store get_part: {e}" |
573 | | ) |
574 | 0 | }); |
575 | | } |
576 | | |
577 | | let end = if let Some(len) = length { |
578 | | cmp::min( |
579 | | offset.saturating_add(usize::try_from(len).unwrap_or(usize::MAX)), |
580 | | data_len, |
581 | | ) |
582 | | } else { |
583 | | data_len |
584 | | }; |
585 | | |
586 | | if offset < end { |
587 | | let chunk = &data[offset..end]; |
588 | | |
589 | | // Send data in chunks |
590 | | for chunk_data in chunk.chunks(self.read_chunk_size) { |
591 | 0 | writer.send(chunk_data.to_vec().into()).await.map_err(|e| { |
592 | 0 | make_err!( |
593 | 0 | Code::Internal, |
594 | | "Failed to write data in ExperimentalMongoStore::get_part: {e}" |
595 | | ) |
596 | 0 | })?; |
597 | | } |
598 | | } |
599 | | |
600 | | drop(semaphore); |
601 | | |
602 | 0 | writer.send_eof().map_err(|e| { |
603 | 0 | make_err!( |
604 | 0 | Code::Internal, |
605 | | "Failed to write EOF in mongo store get_part: {e}" |
606 | | ) |
607 | 0 | }) |
608 | 18 | } |
609 | | |
610 | 0 | fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { |
611 | 0 | self |
612 | 0 | } |
613 | | |
614 | 0 | fn as_any(&self) -> &(dyn core::any::Any + Sync + Send) { |
615 | 0 | self |
616 | 0 | } |
617 | | |
618 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send> { |
619 | 0 | self |
620 | 0 | } |
621 | | |
622 | 0 | fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) { |
623 | 0 | registry.register_indicator(self); |
624 | 0 | } |
625 | | |
626 | 0 | fn register_remove_callback( |
627 | 0 | self: Arc<Self>, |
628 | 0 | _callback: Arc<dyn RemoveItemCallback>, |
629 | 0 | ) -> Result<(), Error> { |
630 | | // drop because we don't remove anything from Mongo |
631 | 0 | Ok(()) |
632 | 0 | } |
633 | | } |
634 | | |
635 | | #[async_trait] |
636 | | impl HealthStatusIndicator for ExperimentalMongoStore { |
637 | 0 | fn get_name(&self) -> &'static str { |
638 | 0 | "ExperimentalMongoStore" |
639 | 0 | } |
640 | | |
641 | 0 | async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { |
642 | | // Note we do not acquire a request_permit here, as the health check needs to always go through |
643 | | // even if everything else is fully loaded |
644 | | match self.database.run_command(doc! { "ping": 1 }).await { |
645 | | Ok(_) => HealthStatus::new_ok(self, "Connection healthy".into()), |
646 | | Err(e) => HealthStatus::new_failed( |
647 | | self, |
648 | | format!("{namespace} - MongoDB connection error: {e}").into(), |
649 | | ), |
650 | | } |
651 | 0 | } |
652 | | } |
653 | | |
654 | | // ------------------------------------------------------------------- |
655 | | // |
656 | | // `ExperimentalMongoDB` scheduler implementation. Likely to change |
657 | | // ------------------------------------------------------------------- |
658 | | |
659 | | /// An individual subscription to a key in `ExperimentalMongoDB`. |
660 | | #[derive(Debug)] |
661 | | pub struct ExperimentalMongoSubscription { |
662 | | receiver: Option<watch::Receiver<String>>, |
663 | | weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<ExperimentalMongoSubscriptionPublisher>>>, |
664 | | } |
665 | | |
666 | | impl SchedulerSubscription for ExperimentalMongoSubscription { |
667 | 0 | async fn changed(&mut self) -> Result<(), Error> { |
668 | 0 | let receiver = self.receiver.as_mut().ok_or_else(|| { |
669 | 0 | make_err!( |
670 | 0 | Code::Internal, |
671 | | "In ExperimentalMongoSubscription::changed::as_mut" |
672 | | ) |
673 | 0 | })?; |
674 | 0 | receiver.changed().await.map_err(|err| { |
675 | 0 | Error::from_std_err(Code::Internal, &err) |
676 | 0 | .append("In ExperimentalMongoSubscription::changed::changed") |
677 | 0 | }) |
678 | 0 | } |
679 | | } |
680 | | |
681 | | impl Drop for ExperimentalMongoSubscription { |
682 | 0 | fn drop(&mut self) { |
683 | 0 | let Some(receiver) = self.receiver.take() else { |
684 | 0 | warn!("ExperimentalMongoSubscription has already been dropped, nothing to do."); |
685 | 0 | return; |
686 | | }; |
687 | 0 | let key = receiver.borrow().clone(); |
688 | 0 | drop(receiver); |
689 | | |
690 | 0 | let Some(subscribed_keys) = self.weak_subscribed_keys.upgrade() else { |
691 | 0 | return; |
692 | | }; |
693 | 0 | let mut subscribed_keys = subscribed_keys.write(); |
694 | 0 | let Some(value) = subscribed_keys.get(&key) else { |
695 | 0 | error!( |
696 | | "Key {key} was not found in subscribed keys when checking if it should be removed." |
697 | | ); |
698 | 0 | return; |
699 | | }; |
700 | | |
701 | 0 | if value.receiver_count() == 0 { |
702 | 0 | subscribed_keys.remove(key); |
703 | 0 | } |
704 | 0 | } |
705 | | } |
706 | | |
707 | | /// A publisher for a key in `ExperimentalMongoDB`. |
708 | | #[derive(Debug)] |
709 | | struct ExperimentalMongoSubscriptionPublisher { |
710 | | sender: Mutex<watch::Sender<String>>, |
711 | | } |
712 | | |
713 | | impl ExperimentalMongoSubscriptionPublisher { |
714 | 0 | fn new( |
715 | 0 | key: String, |
716 | 0 | weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>, |
717 | 0 | ) -> (Self, ExperimentalMongoSubscription) { |
718 | 0 | let (sender, receiver) = watch::channel(key); |
719 | 0 | let publisher = Self { |
720 | 0 | sender: Mutex::new(sender), |
721 | 0 | }; |
722 | 0 | let subscription = ExperimentalMongoSubscription { |
723 | 0 | receiver: Some(receiver), |
724 | 0 | weak_subscribed_keys, |
725 | 0 | }; |
726 | 0 | (publisher, subscription) |
727 | 0 | } |
728 | | |
729 | 0 | fn subscribe( |
730 | 0 | &self, |
731 | 0 | weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>, |
732 | 0 | ) -> ExperimentalMongoSubscription { |
733 | 0 | let receiver = self.sender.lock().subscribe(); |
734 | 0 | ExperimentalMongoSubscription { |
735 | 0 | receiver: Some(receiver), |
736 | 0 | weak_subscribed_keys, |
737 | 0 | } |
738 | 0 | } |
739 | | |
740 | 0 | fn receiver_count(&self) -> usize { |
741 | 0 | self.sender.lock().receiver_count() |
742 | 0 | } |
743 | | |
744 | 0 | fn notify(&self) { |
745 | 0 | self.sender.lock().send_modify(|_| {}); |
746 | 0 | } |
747 | | } |
748 | | |
749 | | #[derive(Debug)] |
750 | | pub struct ExperimentalMongoSubscriptionManager { |
751 | | subscribed_keys: Arc<RwLock<StringPatriciaMap<ExperimentalMongoSubscriptionPublisher>>>, |
752 | | _subscription_spawn: JoinHandleDropGuard<()>, |
753 | | } |
754 | | |
755 | | impl ExperimentalMongoSubscriptionManager { |
756 | 0 | pub fn new(database: Database, collection_name: String, key_prefix: String) -> Self { |
757 | 0 | let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new())); |
758 | 0 | let subscribed_keys_weak = Arc::downgrade(&subscribed_keys); |
759 | | |
760 | | Self { |
761 | 0 | subscribed_keys, |
762 | 0 | _subscription_spawn: spawn!("mongo_subscribe_spawn", async move { |
763 | 0 | let collection = database.collection::<Document>(&collection_name); |
764 | | |
765 | | loop { |
766 | | // Try to create change stream |
767 | 0 | let change_stream_result = collection.watch().await; |
768 | | |
769 | 0 | match change_stream_result { |
770 | 0 | Ok(mut change_stream) => { |
771 | 0 | info!("MongoDB change stream connected"); |
772 | | |
773 | 0 | while let Some(event_result) = change_stream.next().await { |
774 | 0 | match event_result { |
775 | 0 | Ok(event) => { |
776 | | use mongodb::change_stream::event::OperationType; |
777 | 0 | match event.operation_type { |
778 | | OperationType::Insert |
779 | | | OperationType::Update |
780 | | | OperationType::Replace |
781 | | | OperationType::Delete => { |
782 | 0 | if let Some(doc_key) = event.document_key |
783 | 0 | && let Ok(key) = doc_key.get_str(KEY_FIELD) |
784 | | { |
785 | | // Remove prefix if present |
786 | 0 | let key = if key_prefix.is_empty() { |
787 | 0 | key |
788 | | } else { |
789 | 0 | key.strip_prefix(&key_prefix).unwrap_or(key) |
790 | | }; |
791 | | |
792 | 0 | let Some(subscribed_keys) = |
793 | 0 | subscribed_keys_weak.upgrade() |
794 | | else { |
795 | 0 | warn!( |
796 | | "Parent dropped, exiting ExperimentalMongoSubscriptionManager" |
797 | | ); |
798 | 0 | return; |
799 | | }; |
800 | | |
801 | 0 | let subscribed_keys_mux = |
802 | 0 | subscribed_keys.read(); |
803 | 0 | subscribed_keys_mux |
804 | 0 | .common_prefix_values(key) |
805 | 0 | .for_each(ExperimentalMongoSubscriptionPublisher::notify); |
806 | 0 | } |
807 | | } |
808 | 0 | _ => {} |
809 | | } |
810 | | } |
811 | 0 | Err(e) => { |
812 | 0 | error!("Error in change stream: {e}"); |
813 | 0 | break; |
814 | | } |
815 | | } |
816 | | } |
817 | | } |
818 | 0 | Err(e) => { |
819 | 0 | warn!("Failed to create change stream: {e}. Will retry in 5 seconds."); |
820 | | } |
821 | | } |
822 | | |
823 | | // Check if parent is still alive |
824 | 0 | if subscribed_keys_weak.upgrade().is_none() { |
825 | 0 | warn!("Parent dropped, exiting ExperimentalMongoSubscriptionManager"); |
826 | 0 | return; |
827 | 0 | } |
828 | | |
829 | | // Sleep before retry |
830 | 0 | sleep(Duration::from_secs(5)).await; |
831 | | |
832 | | // Notify all subscribers on reconnection |
833 | 0 | if let Some(subscribed_keys) = subscribed_keys_weak.upgrade() { |
834 | 0 | let subscribed_keys_mux = subscribed_keys.read(); |
835 | 0 | for publisher in subscribed_keys_mux.values() { |
836 | 0 | publisher.notify(); |
837 | 0 | } |
838 | 0 | } |
839 | | } |
840 | 0 | }), |
841 | | } |
842 | 0 | } |
843 | | } |
844 | | |
845 | | impl SchedulerSubscriptionManager for ExperimentalMongoSubscriptionManager { |
846 | | type Subscription = ExperimentalMongoSubscription; |
847 | | |
848 | 0 | fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error> |
849 | 0 | where |
850 | 0 | K: SchedulerStoreKeyProvider, |
851 | | { |
852 | 0 | let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys); |
853 | 0 | let mut subscribed_keys = self.subscribed_keys.write(); |
854 | 0 | let key = key.get_key(); |
855 | 0 | let key_str = key.as_str(); |
856 | | |
857 | 0 | let mut subscription = if let Some(publisher) = subscribed_keys.get(&key_str) { |
858 | 0 | publisher.subscribe(weak_subscribed_keys) |
859 | | } else { |
860 | 0 | let (publisher, subscription) = ExperimentalMongoSubscriptionPublisher::new( |
861 | 0 | key_str.to_string(), |
862 | 0 | weak_subscribed_keys, |
863 | 0 | ); |
864 | 0 | subscribed_keys.insert(key_str, publisher); |
865 | 0 | subscription |
866 | | }; |
867 | | |
868 | 0 | subscription |
869 | 0 | .receiver |
870 | 0 | .as_mut() |
871 | 0 | .ok_or_else(|| { |
872 | 0 | make_err!( |
873 | 0 | Code::Internal, |
874 | | "Receiver should be set in ExperimentalMongoSubscriptionManager::subscribe" |
875 | | ) |
876 | 0 | })? |
877 | 0 | .mark_changed(); |
878 | | |
879 | 0 | Ok(subscription) |
880 | 0 | } |
881 | | |
882 | 0 | fn is_reliable() -> bool { |
883 | 0 | true |
884 | 0 | } |
885 | | } |
886 | | |
887 | | impl SchedulerStore for ExperimentalMongoStore { |
888 | | type SubscriptionManager = ExperimentalMongoSubscriptionManager; |
889 | | |
890 | 0 | async fn subscription_manager( |
891 | 0 | &self, |
892 | 0 | ) -> Result<Arc<ExperimentalMongoSubscriptionManager>, Error> { |
893 | 0 | let mut subscription_manager = self.subscription_manager.lock(); |
894 | 0 | if let Some(subscription_manager) = &*subscription_manager { |
895 | 0 | Ok(subscription_manager.clone()) |
896 | | } else { |
897 | 0 | if !self.enable_change_streams { |
898 | 0 | return Err(make_input_err!( |
899 | 0 | "ExperimentalMongoStore must have change streams enabled for scheduler subscriptions" |
900 | 0 | )); |
901 | 0 | } |
902 | | |
903 | 0 | let sub = Arc::new(ExperimentalMongoSubscriptionManager::new( |
904 | 0 | self.database.clone(), |
905 | 0 | self.scheduler_collection.name().to_string(), |
906 | 0 | self.key_prefix.clone(), |
907 | | )); |
908 | 0 | *subscription_manager = Some(sub.clone()); |
909 | 0 | Ok(sub) |
910 | | } |
911 | 0 | } |
912 | | |
913 | 0 | async fn update_data<T>(&self, data: T) -> Result<Option<i64>, Error> |
914 | 0 | where |
915 | 0 | T: SchedulerStoreDataProvider |
916 | 0 | + SchedulerStoreKeyProvider |
917 | 0 | + SchedulerCurrentVersionProvider |
918 | 0 | + Send, |
919 | 0 | { |
920 | 0 | let key = data.get_key(); |
921 | 0 | let encoded_key = self.encode_key(&key); |
922 | 0 | let maybe_index = data.get_indexes().map_err(|e| { |
923 | 0 | make_err!( |
924 | 0 | Code::Internal, |
925 | | "Error getting indexes in ExperimentalMongoStore::update_data: {e}" |
926 | | ) |
927 | 0 | })?; |
928 | | |
929 | 0 | if <T as SchedulerStoreKeyProvider>::Versioned::VALUE { |
930 | 0 | let current_version = data.current_version(); |
931 | 0 | let data_bytes = data.try_into_bytes().map_err(|e| { |
932 | 0 | make_err!( |
933 | 0 | Code::Internal, |
934 | | "Could not convert value to bytes in ExperimentalMongoStore::update_data: {e}" |
935 | | ) |
936 | 0 | })?; |
937 | | |
938 | 0 | let mut update_doc = doc! { |
939 | | "$set": { |
940 | 0 | DATA_FIELD: Bson::Binary(mongodb::bson::Binary { |
941 | 0 | subtype: mongodb::bson::spec::BinarySubtype::Generic, |
942 | 0 | bytes: data_bytes.to_vec(), |
943 | 0 | }), |
944 | | }, |
945 | | "$inc": { |
946 | | VERSION_FIELD: 1i64, |
947 | | } |
948 | | }; |
949 | | |
950 | | // Add indexes |
951 | 0 | for (name, value) in maybe_index { |
952 | 0 | update_doc.get_document_mut("$set").unwrap().insert( |
953 | 0 | name, |
954 | 0 | Bson::Binary(mongodb::bson::Binary { |
955 | 0 | subtype: mongodb::bson::spec::BinarySubtype::Generic, |
956 | 0 | bytes: value.to_vec(), |
957 | 0 | }), |
958 | 0 | ); |
959 | 0 | } |
960 | | |
961 | 0 | let filter = doc! { |
962 | 0 | KEY_FIELD: encoded_key.as_ref(), |
963 | 0 | VERSION_FIELD: current_version, |
964 | | }; |
965 | | |
966 | 0 | let semaphore = self.acquire_permit().await?; |
967 | | |
968 | 0 | let result = match self |
969 | 0 | .scheduler_collection |
970 | 0 | .find_one_and_update(filter, update_doc) |
971 | 0 | .upsert(true) |
972 | 0 | .return_document(ReturnDocument::After) |
973 | 0 | .await |
974 | | { |
975 | 0 | Ok(Some(doc)) => Ok(doc.get_i64(VERSION_FIELD).ok().or(Some(1i64))), |
976 | 0 | Ok(None) => Ok(None), |
977 | 0 | Err(e) => Err(make_err!( |
978 | 0 | Code::Internal, |
979 | 0 | "MongoDB error in update_data: {e}" |
980 | 0 | )), |
981 | | }; |
982 | 0 | drop(semaphore); |
983 | 0 | result |
984 | | } else { |
985 | 0 | let data_bytes = data.try_into_bytes().map_err(|e| { |
986 | 0 | make_err!( |
987 | 0 | Code::Internal, |
988 | | "Could not convert value to bytes in ExperimentalMongoStore::update_data: {e}" |
989 | | ) |
990 | 0 | })?; |
991 | | |
992 | 0 | let mut doc = doc! { |
993 | 0 | KEY_FIELD: encoded_key.as_ref(), |
994 | 0 | DATA_FIELD: Bson::Binary(mongodb::bson::Binary { |
995 | 0 | subtype: mongodb::bson::spec::BinarySubtype::Generic, |
996 | 0 | bytes: data_bytes.to_vec(), |
997 | 0 | }), |
998 | | }; |
999 | | |
1000 | | // Add indexes |
1001 | 0 | for (name, value) in maybe_index { |
1002 | 0 | doc.insert( |
1003 | 0 | name, |
1004 | 0 | Bson::Binary(mongodb::bson::Binary { |
1005 | 0 | subtype: mongodb::bson::spec::BinarySubtype::Generic, |
1006 | 0 | bytes: value.to_vec(), |
1007 | 0 | }), |
1008 | 0 | ); |
1009 | 0 | } |
1010 | | |
1011 | 0 | let semaphore = self.acquire_permit().await?; |
1012 | | |
1013 | 0 | self.scheduler_collection |
1014 | 0 | .update_one( |
1015 | 0 | doc! { KEY_FIELD: encoded_key.as_ref() }, |
1016 | 0 | doc! { "$set": doc }, |
1017 | 0 | ) |
1018 | 0 | .upsert(true) |
1019 | 0 | .await |
1020 | 0 | .map_err(|e| { |
1021 | 0 | make_err!(Code::Internal, "Failed to update scheduler document: {e}") |
1022 | 0 | })?; |
1023 | | |
1024 | 0 | drop(semaphore); |
1025 | | |
1026 | 0 | Ok(Some(0)) |
1027 | | } |
1028 | 0 | } |
1029 | | |
1030 | 0 | async fn search_by_index_prefix<K>( |
1031 | 0 | &self, |
1032 | 0 | index: K, |
1033 | 0 | ) -> Result< |
1034 | 0 | impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send, |
1035 | 0 | Error, |
1036 | 0 | > |
1037 | 0 | where |
1038 | 0 | K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send, |
1039 | 0 | { |
1040 | 0 | let index_value = index.index_value(); |
1041 | | |
1042 | | // Create index if it doesn't exist |
1043 | 0 | let index_name = format!("{}_{}", K::KEY_PREFIX, K::INDEX_NAME); |
1044 | 0 | self.scheduler_collection |
1045 | 0 | .create_index( |
1046 | 0 | IndexModel::builder() |
1047 | 0 | .keys(doc! { K::INDEX_NAME: 1 }) |
1048 | 0 | .options(IndexOptions::builder().name(index_name).build()) |
1049 | 0 | .build(), |
1050 | 0 | ) |
1051 | 0 | .await |
1052 | 0 | .map_err(|e| make_err!(Code::Internal, "Failed to create scheduler index: {e}"))?; |
1053 | | |
1054 | | // Build filter |
1055 | 0 | let filter = doc! { |
1056 | | K::INDEX_NAME: { |
1057 | 0 | "$regex": format!("^{}", regex::escape(index_value.as_ref())), |
1058 | | } |
1059 | | }; |
1060 | | |
1061 | | // Add sort if specified |
1062 | 0 | let find_options = if let Some(sort_key) = K::MAYBE_SORT_KEY { |
1063 | 0 | FindOptions::builder().sort(doc! { sort_key: 1 }).build() |
1064 | | } else { |
1065 | 0 | FindOptions::default() |
1066 | | }; |
1067 | | |
1068 | 0 | let cursor = self |
1069 | 0 | .scheduler_collection |
1070 | 0 | .find(filter) |
1071 | 0 | .with_options(find_options) |
1072 | 0 | .await |
1073 | 0 | .map_err(|e| { |
1074 | 0 | make_err!( |
1075 | 0 | Code::Internal, |
1076 | | "Failed to create cursor in search_by_index_prefix: {e}" |
1077 | | ) |
1078 | 0 | })?; |
1079 | | |
1080 | 0 | Ok(cursor.map(move |result| { |
1081 | 0 | let doc = result.map_err(|e| { |
1082 | 0 | make_err!( |
1083 | 0 | Code::Internal, |
1084 | | "Error reading document in search_by_index_prefix: {e}" |
1085 | | ) |
1086 | 0 | })?; |
1087 | | |
1088 | 0 | let data = match doc.get(DATA_FIELD) { |
1089 | 0 | Some(Bson::Binary(binary)) => Bytes::from(binary.bytes.clone()), |
1090 | | _ => { |
1091 | 0 | return Err(make_err!( |
1092 | 0 | Code::Internal, |
1093 | 0 | "Missing or invalid data field in search_by_index_prefix" |
1094 | 0 | )); |
1095 | | } |
1096 | | }; |
1097 | | |
1098 | 0 | let version = if <K as SchedulerIndexProvider>::Versioned::VALUE { |
1099 | 0 | doc.get_i64(VERSION_FIELD).unwrap_or(0) |
1100 | | } else { |
1101 | 0 | 0 |
1102 | | }; |
1103 | | |
1104 | 0 | K::decode(version, data).map_err(|e| { |
1105 | 0 | make_err!( |
1106 | 0 | Code::Internal, |
1107 | | "Failed to decode in search_by_index_prefix: {e}" |
1108 | | ) |
1109 | 0 | }) |
1110 | 0 | })) |
1111 | 0 | } |
1112 | | |
1113 | 0 | async fn get_and_decode<K>( |
1114 | 0 | &self, |
1115 | 0 | key: K, |
1116 | 0 | ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error> |
1117 | 0 | where |
1118 | 0 | K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send, |
1119 | 0 | { |
1120 | 0 | let key = key.get_key(); |
1121 | 0 | let encoded_key = self.encode_key(&key); |
1122 | 0 | let filter = doc! { KEY_FIELD: encoded_key.as_ref() }; |
1123 | | |
1124 | 0 | let doc = self |
1125 | 0 | .scheduler_collection |
1126 | 0 | .find_one(filter) |
1127 | 0 | .await |
1128 | 0 | .map_err(|e| { |
1129 | 0 | make_err!( |
1130 | 0 | Code::Internal, |
1131 | | "Failed to find document in get_and_decode: {e}" |
1132 | | ) |
1133 | 0 | })?; |
1134 | | |
1135 | 0 | let Some(doc) = doc else { |
1136 | 0 | return Ok(None); |
1137 | | }; |
1138 | | |
1139 | 0 | let data = match doc.get(DATA_FIELD) { |
1140 | 0 | Some(Bson::Binary(binary)) => Bytes::from(binary.bytes.clone()), |
1141 | 0 | _ => return Ok(None), |
1142 | | }; |
1143 | | |
1144 | 0 | let version = if <K as SchedulerStoreKeyProvider>::Versioned::VALUE { |
1145 | 0 | doc.get_i64(VERSION_FIELD).unwrap_or(0) |
1146 | | } else { |
1147 | 0 | 0 |
1148 | | }; |
1149 | | |
1150 | 0 | Ok(Some(K::decode(version, data).map_err(|e| { |
1151 | 0 | make_err!(Code::Internal, "Failed to decode in get_and_decode: {e}") |
1152 | 0 | })?)) |
1153 | 0 | } |
1154 | | } |