/build/source/nativelink-store/src/mongo_store.rs
Line | Count | Source |
1 | | // Copyright 2025 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::cmp; |
16 | | use core::ops::Bound; |
17 | | use core::pin::Pin; |
18 | | use core::time::Duration; |
19 | | use std::borrow::Cow; |
20 | | use std::sync::{Arc, Weak}; |
21 | | |
22 | | use async_trait::async_trait; |
23 | | use bytes::Bytes; |
24 | | use futures::stream::{Stream, StreamExt, TryStreamExt}; |
25 | | use mongodb::bson::{Bson, Document, doc}; |
26 | | use mongodb::options::{ClientOptions, FindOptions, IndexOptions, ReturnDocument, WriteConcern}; |
27 | | use mongodb::{Client as MongoClient, Collection, Database, IndexModel}; |
28 | | use nativelink_config::stores::ExperimentalMongoSpec; |
29 | | use nativelink_error::{Code, Error, make_err, make_input_err}; |
30 | | use nativelink_metric::MetricsComponent; |
31 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
32 | | use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; |
33 | | use nativelink_util::spawn; |
34 | | use nativelink_util::store_trait::{ |
35 | | BoolValue, RemoveItemCallback, SchedulerCurrentVersionProvider, SchedulerIndexProvider, |
36 | | SchedulerStore, SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider, |
37 | | SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo, |
38 | | }; |
39 | | use nativelink_util::task::JoinHandleDropGuard; |
40 | | use parking_lot::{Mutex, RwLock}; |
41 | | use patricia_tree::StringPatriciaMap; |
42 | | use tokio::sync::watch; |
43 | | use tokio::time::sleep; |
44 | | use tracing::{error, info, warn}; |
45 | | |
46 | | use crate::cas_utils::is_zero_digest; |
47 | | |
48 | | /// The default database name if not specified. |
49 | | const DEFAULT_DB_NAME: &str = "nativelink"; |
50 | | |
51 | | /// The default collection name if not specified. |
52 | | const DEFAULT_COLLECTION_NAME: &str = "cas"; |
53 | | |
54 | | /// The default scheduler collection name if not specified. |
55 | | const DEFAULT_SCHEDULER_COLLECTION_NAME: &str = "scheduler"; |
56 | | |
57 | | /// The default size of the read chunk when reading data from `MongoDB`. |
58 | | const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024; |
59 | | |
60 | | /// The default connection timeout in milliseconds if not specified. |
61 | | const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000; |
62 | | |
63 | | /// The default command timeout in milliseconds if not specified. |
64 | | const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000; |
65 | | |
66 | | /// The default maximum number of concurrent uploads. |
67 | | const DEFAULT_MAX_CONCURRENT_UPLOADS: usize = 10; |
68 | | |
69 | | /// The name of the field in `MongoDB` documents that stores the key. |
70 | | const KEY_FIELD: &str = "_id"; |
71 | | |
72 | | /// The name of the field in `MongoDB` documents that stores the data. |
73 | | const DATA_FIELD: &str = "data"; |
74 | | |
75 | | /// The name of the field in `MongoDB` documents that stores the version. |
76 | | const VERSION_FIELD: &str = "version"; |
77 | | |
78 | | /// The name of the field in `MongoDB` documents that stores the size. |
79 | | const SIZE_FIELD: &str = "size"; |
80 | | |
81 | | /// A [`StoreDriver`] implementation that uses `MongoDB` as a backing store. |
82 | | #[derive(Debug, MetricsComponent)] |
83 | | pub struct ExperimentalMongoStore { |
84 | | /// The `MongoDB` client. |
85 | | #[allow(dead_code)] |
86 | | client: MongoClient, |
87 | | |
88 | | /// The database to use. |
89 | | database: Database, |
90 | | |
91 | | /// The collection for CAS data. |
92 | | cas_collection: Collection<Document>, |
93 | | |
94 | | /// The collection for scheduler data. |
95 | | scheduler_collection: Collection<Document>, |
96 | | |
97 | | /// A common prefix to append to all keys before they are sent to `MongoDB`. |
98 | | #[metric(help = "Prefix to append to all keys before sending to MongoDB")] |
99 | | key_prefix: String, |
100 | | |
101 | | /// The amount of data to read from `MongoDB` at a time. |
102 | | #[metric(help = "The amount of data to read from MongoDB at a time")] |
103 | | read_chunk_size: usize, |
104 | | |
105 | | /// The maximum number of concurrent uploads. |
106 | | #[metric(help = "The maximum number of concurrent uploads")] |
107 | | max_concurrent_uploads: usize, |
108 | | |
109 | | /// Enable change streams for real-time updates. |
110 | | #[metric(help = "Whether change streams are enabled")] |
111 | | enable_change_streams: bool, |
112 | | |
113 | | /// A manager for subscriptions to keys in `MongoDB`. |
114 | | subscription_manager: Mutex<Option<Arc<ExperimentalMongoSubscriptionManager>>>, |
115 | | } |
116 | | |
117 | | impl ExperimentalMongoStore { |
118 | | /// Create a new `ExperimentalMongoStore` from the given configuration. |
119 | 3 | pub async fn new(mut spec: ExperimentalMongoSpec) -> Result<Arc<Self>, Error> { |
120 | | // Set defaults |
121 | 3 | if spec.connection_string.is_empty() { Branch (121:12): [True: 0, False: 0]
Branch (121:12): [True: 1, False: 2]
Branch (121:12): [Folded - Ignored]
|
122 | 1 | return Err(make_err!( |
123 | 1 | Code::InvalidArgument, |
124 | 1 | "No connection string was specified in mongo store configuration." |
125 | 1 | )); |
126 | 2 | } |
127 | | |
128 | 2 | if spec.database.is_empty() { Branch (128:12): [True: 0, False: 0]
Branch (128:12): [True: 0, False: 2]
Branch (128:12): [Folded - Ignored]
|
129 | 0 | spec.database = DEFAULT_DB_NAME.to_string(); |
130 | 2 | } |
131 | | |
132 | 2 | if spec.cas_collection.is_empty() { Branch (132:12): [True: 0, False: 0]
Branch (132:12): [True: 0, False: 2]
Branch (132:12): [Folded - Ignored]
|
133 | 0 | spec.cas_collection = DEFAULT_COLLECTION_NAME.to_string(); |
134 | 2 | } |
135 | | |
136 | 2 | if spec.scheduler_collection.is_empty() { Branch (136:12): [True: 0, False: 0]
Branch (136:12): [True: 0, False: 2]
Branch (136:12): [Folded - Ignored]
|
137 | 0 | spec.scheduler_collection = DEFAULT_SCHEDULER_COLLECTION_NAME.to_string(); |
138 | 2 | } |
139 | | |
140 | 2 | if spec.read_chunk_size == 0 { Branch (140:12): [True: 0, False: 0]
Branch (140:12): [True: 0, False: 2]
Branch (140:12): [Folded - Ignored]
|
141 | 0 | spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE; |
142 | 2 | } |
143 | | |
144 | 2 | if spec.connection_timeout_ms == 0 { Branch (144:12): [True: 0, False: 0]
Branch (144:12): [True: 0, False: 2]
Branch (144:12): [Folded - Ignored]
|
145 | 0 | spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS; |
146 | 2 | } |
147 | | |
148 | 2 | if spec.command_timeout_ms == 0 { Branch (148:12): [True: 0, False: 0]
Branch (148:12): [True: 0, False: 2]
Branch (148:12): [Folded - Ignored]
|
149 | 0 | spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS; |
150 | 2 | } |
151 | | |
152 | 2 | if spec.max_concurrent_uploads == 0 { Branch (152:12): [True: 0, False: 0]
Branch (152:12): [True: 0, False: 2]
Branch (152:12): [Folded - Ignored]
|
153 | 0 | spec.max_concurrent_uploads = DEFAULT_MAX_CONCURRENT_UPLOADS; |
154 | 2 | } |
155 | | |
156 | | // Configure client options |
157 | 2 | let mut client_options1 = ClientOptions::parse(&spec.connection_string) |
158 | 2 | .await |
159 | 2 | .map_err(|e| {1 |
160 | 1 | make_err!( |
161 | 1 | Code::InvalidArgument, |
162 | | "Failed to parse MongoDB connection string: {e}" |
163 | | ) |
164 | 1 | })?; |
165 | | |
166 | 1 | client_options.server_selection_timeout = |
167 | 1 | Some(Duration::from_millis(spec.connection_timeout_ms)); |
168 | 1 | client_options.connect_timeout = Some(Duration::from_millis(spec.connection_timeout_ms)); |
169 | | |
170 | | // Set write concern if specified |
171 | 1 | if let Some(w0 ) = spec.write_concern_w { Branch (171:16): [True: 0, False: 0]
Branch (171:16): [True: 0, False: 1]
Branch (171:16): [Folded - Ignored]
|
172 | | // Parse write concern w - can be a number or string like "majority" |
173 | 0 | let w_value = if let Ok(num) = w.parse::<u32>() { Branch (173:34): [True: 0, False: 0]
Branch (173:34): [True: 0, False: 0]
Branch (173:34): [Folded - Ignored]
|
174 | 0 | Some(num.into()) |
175 | | } else { |
176 | 0 | Some(w.into()) |
177 | | }; |
178 | | |
179 | | // Build write concern based on which options are set |
180 | 0 | let write_concern = match (w_value, spec.write_concern_j, spec.write_concern_timeout_ms) |
181 | | { |
182 | 0 | (Some(w), Some(j), Some(timeout)) => WriteConcern::builder() |
183 | 0 | .w(Some(w)) |
184 | 0 | .journal(j) |
185 | 0 | .w_timeout(Some(Duration::from_millis(u64::from(timeout)))) |
186 | 0 | .build(), |
187 | 0 | (Some(w), Some(j), None) => WriteConcern::builder().w(Some(w)).journal(j).build(), |
188 | 0 | (Some(w), None, Some(timeout)) => WriteConcern::builder() |
189 | 0 | .w(Some(w)) |
190 | 0 | .w_timeout(Some(Duration::from_millis(u64::from(timeout)))) |
191 | 0 | .build(), |
192 | 0 | (Some(w), None, None) => WriteConcern::builder().w(Some(w)).build(), |
193 | 0 | _ => unreachable!(), // We know w is Some because we're in the if let Some(w) block |
194 | | }; |
195 | | |
196 | 0 | client_options.write_concern = Some(write_concern); |
197 | 1 | } else if spec.write_concern_j.is_some() || spec.write_concern_timeout_ms0 .is_some0 () { Branch (197:19): [True: 0, False: 0]
Branch (197:53): [True: 0, False: 0]
Branch (197:19): [True: 1, False: 0]
Branch (197:53): [True: 0, False: 0]
Branch (197:19): [Folded - Ignored]
Branch (197:53): [Folded - Ignored]
|
198 | 1 | return Err(make_err!( |
199 | 1 | Code::InvalidArgument, |
200 | 1 | "write_concern_w not set, but j and/or timeout set. Please set 'write_concern_w' to a non-default value. See https://www.mongodb.com/docs/manual/reference/write-concern/#w-option for options." |
201 | 1 | )); |
202 | 0 | } |
203 | | |
204 | | // Create client |
205 | 0 | let client = MongoClient::with_options(client_options).map_err(|e| { |
206 | 0 | make_err!( |
207 | 0 | Code::InvalidArgument, |
208 | | "Failed to create MongoDB client: {e}" |
209 | | ) |
210 | 0 | })?; |
211 | | |
212 | | // Get database and collections |
213 | 0 | let database = client.database(&spec.database); |
214 | 0 | let cas_collection = database.collection::<Document>(&spec.cas_collection); |
215 | 0 | let scheduler_collection = database.collection::<Document>(&spec.scheduler_collection); |
216 | | |
217 | | // Create indexes |
218 | 0 | Self::create_indexes(&cas_collection, &scheduler_collection).await?; |
219 | | |
220 | 0 | let store = Self { |
221 | 0 | client, |
222 | 0 | database, |
223 | 0 | cas_collection, |
224 | 0 | scheduler_collection, |
225 | 0 | key_prefix: spec.key_prefix.clone().unwrap_or_default(), |
226 | 0 | read_chunk_size: spec.read_chunk_size, |
227 | 0 | max_concurrent_uploads: spec.max_concurrent_uploads, |
228 | 0 | enable_change_streams: spec.enable_change_streams, |
229 | 0 | subscription_manager: Mutex::new(None), |
230 | 0 | }; |
231 | | |
232 | 0 | Ok(Arc::new(store)) |
233 | 3 | } |
234 | | |
235 | | /// Create necessary indexes for efficient operations. |
236 | 0 | async fn create_indexes( |
237 | 0 | cas_collection: &Collection<Document>, |
238 | 0 | _scheduler_collection: &Collection<Document>, |
239 | 0 | ) -> Result<(), Error> { |
240 | | // CAS collection indexes |
241 | 0 | cas_collection |
242 | 0 | .create_index(IndexModel::builder().keys(doc! { SIZE_FIELD: 1 }).build()) |
243 | 0 | .await |
244 | 0 | .map_err(|e| { |
245 | 0 | make_err!( |
246 | 0 | Code::Internal, |
247 | | "Failed to create size index on CAS collection: {e}" |
248 | | ) |
249 | 0 | })?; |
250 | | |
251 | | // Scheduler collection will have dynamic indexes created as needed |
252 | | |
253 | 0 | Ok(()) |
254 | 0 | } |
255 | | |
256 | | /// Encode a [`StoreKey`] so it can be sent to `MongoDB`. |
257 | 0 | fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> { |
258 | 0 | let key_body = key.as_str(); |
259 | 0 | if self.key_prefix.is_empty() { Branch (259:12): [True: 0, False: 0]
Branch (259:12): [Folded - Ignored]
|
260 | 0 | key_body |
261 | | } else { |
262 | 0 | match key_body { |
263 | 0 | Cow::Owned(mut encoded_key) => { |
264 | 0 | encoded_key.insert_str(0, &self.key_prefix); |
265 | 0 | Cow::Owned(encoded_key) |
266 | | } |
267 | 0 | Cow::Borrowed(body) => { |
268 | 0 | let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len()); |
269 | 0 | encoded_key.push_str(&self.key_prefix); |
270 | 0 | encoded_key.push_str(body); |
271 | 0 | Cow::Owned(encoded_key) |
272 | | } |
273 | | } |
274 | | } |
275 | 0 | } |
276 | | |
277 | | /// Decode a key from `MongoDB` by removing the prefix. |
278 | 0 | fn decode_key(&self, key: &str) -> Option<String> { |
279 | 0 | if self.key_prefix.is_empty() { Branch (279:12): [True: 0, False: 0]
Branch (279:12): [Folded - Ignored]
|
280 | 0 | Some(key.to_string()) |
281 | | } else { |
282 | 0 | key.strip_prefix(&self.key_prefix).map(ToString::to_string) |
283 | | } |
284 | 0 | } |
285 | | } |
286 | | |
287 | | #[async_trait] |
288 | | impl StoreDriver for ExperimentalMongoStore { |
289 | | async fn has_with_results( |
290 | | self: Pin<&Self>, |
291 | | keys: &[StoreKey<'_>], |
292 | | results: &mut [Option<u64>], |
293 | 0 | ) -> Result<(), Error> { |
294 | | for (key, result) in keys.iter().zip(results.iter_mut()) { |
295 | | // Handle zero digest specially |
296 | | if is_zero_digest(key.borrow()) { |
297 | | *result = Some(0); |
298 | | continue; |
299 | | } |
300 | | |
301 | | let encoded_key = self.encode_key(key); |
302 | | let filter = doc! { KEY_FIELD: encoded_key.as_ref() }; |
303 | | |
304 | | match self.cas_collection.find_one(filter).await { |
305 | | Ok(Some(doc)) => { |
306 | 0 | *result = doc.get_i64(SIZE_FIELD).ok().map(|v| v as u64); |
307 | | } |
308 | | Ok(None) => { |
309 | | *result = None; |
310 | | } |
311 | | Err(e) => { |
312 | | return Err(make_err!( |
313 | | Code::Internal, |
314 | | "MongoDB error in has_with_results: {e}" |
315 | | )); |
316 | | } |
317 | | } |
318 | | } |
319 | | Ok(()) |
320 | 0 | } |
321 | | |
322 | | async fn list( |
323 | | self: Pin<&Self>, |
324 | | range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>), |
325 | | handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_), |
326 | 0 | ) -> Result<u64, Error> { |
327 | | let mut filter = Document::new(); |
328 | | |
329 | | // Build range query |
330 | | let mut key_filter = Document::new(); |
331 | | match &range.0 { |
332 | | Bound::Included(start) => { |
333 | | let encoded = self.encode_key(start); |
334 | | key_filter.insert("$gte", encoded.as_ref()); |
335 | | } |
336 | | Bound::Excluded(start) => { |
337 | | let encoded = self.encode_key(start); |
338 | | key_filter.insert("$gt", encoded.as_ref()); |
339 | | } |
340 | | Bound::Unbounded => {} |
341 | | } |
342 | | |
343 | | match &range.1 { |
344 | | Bound::Included(end) => { |
345 | | let encoded = self.encode_key(end); |
346 | | key_filter.insert("$lte", encoded.as_ref()); |
347 | | } |
348 | | Bound::Excluded(end) => { |
349 | | let encoded = self.encode_key(end); |
350 | | key_filter.insert("$lt", encoded.as_ref()); |
351 | | } |
352 | | Bound::Unbounded => {} |
353 | | } |
354 | | |
355 | | if !key_filter.is_empty() { |
356 | | filter.insert(KEY_FIELD, key_filter); |
357 | | } |
358 | | |
359 | | // Add prefix filter if needed |
360 | | if !self.key_prefix.is_empty() { |
361 | | let regex_filter = doc! { |
362 | | KEY_FIELD: { |
363 | | "$regex": format!("^{}", regex::escape(&self.key_prefix)), |
364 | | } |
365 | | }; |
366 | | if filter.is_empty() { |
367 | | filter = regex_filter; |
368 | | } else { |
369 | | filter = doc! { "$and": [filter, regex_filter] }; |
370 | | } |
371 | | } |
372 | | |
373 | | let mut cursor = self |
374 | | .cas_collection |
375 | | .find(filter) |
376 | | .projection(doc! { KEY_FIELD: 1 }) |
377 | | .await |
378 | 0 | .map_err(|e| make_err!(Code::Internal, "Failed to create cursor in list: {e}"))?; |
379 | | |
380 | | let mut count = 0u64; |
381 | | while let Some(doc) = cursor |
382 | | .try_next() |
383 | | .await |
384 | 0 | .map_err(|e| make_err!(Code::Internal, "Failed to get next document in list: {e}"))? |
385 | | { |
386 | | if let Ok(key) = doc.get_str(KEY_FIELD) { |
387 | | if let Some(decoded_key) = self.decode_key(key) { |
388 | | let store_key = StoreKey::new_str(&decoded_key); |
389 | | count += 1; |
390 | | if !handler(&store_key) { |
391 | | break; |
392 | | } |
393 | | } |
394 | | } |
395 | | } |
396 | | |
397 | | Ok(count) |
398 | 0 | } |
399 | | |
400 | | async fn update( |
401 | | self: Pin<&Self>, |
402 | | key: StoreKey<'_>, |
403 | | mut reader: DropCloserReadHalf, |
404 | | upload_size: UploadSizeInfo, |
405 | 0 | ) -> Result<(), Error> { |
406 | | let encoded_key = self.encode_key(&key); |
407 | | |
408 | | // Handle zero digest |
409 | | if is_zero_digest(key.borrow()) { |
410 | 0 | let chunk = reader.peek().await.map_err(|e| { |
411 | 0 | make_err!( |
412 | 0 | Code::Internal, |
413 | | "Failed to peek in ExperimentalMongoStore::update: {e}" |
414 | | ) |
415 | 0 | })?; |
416 | | if chunk.is_empty() { |
417 | 0 | reader.drain().await.map_err(|e| { |
418 | 0 | make_err!( |
419 | 0 | Code::Internal, |
420 | | "Failed to drain in ExperimentalMongoStore::update: {e}" |
421 | | ) |
422 | 0 | })?; |
423 | | return Ok(()); |
424 | | } |
425 | | } |
426 | | |
427 | | // Special handling for MaxSize(0) - this should error if stream is closed |
428 | | if upload_size == UploadSizeInfo::MaxSize(0) { |
429 | | // Try to read from the stream - if it's closed immediately, this should error |
430 | | match reader.recv().await { |
431 | | Ok(_chunk) => { |
432 | | return Err(make_input_err!( |
433 | | "Received data when MaxSize is 0 in MongoDB store" |
434 | | )); |
435 | | } |
436 | | Err(e) => { |
437 | | return Err(make_err!( |
438 | | Code::InvalidArgument, |
439 | | "Stream closed for MaxSize(0) upload: {e}" |
440 | | )); |
441 | | } |
442 | | } |
443 | | } |
444 | | |
445 | | // Read all data into memory with proper EOF handling |
446 | | let mut data = Vec::new(); |
447 | | while let Ok(chunk) = reader.recv().await { |
448 | | if chunk.is_empty() { |
449 | | break; // Empty chunk signals EOF |
450 | | } |
451 | | data.extend_from_slice(&chunk); |
452 | | } |
453 | | |
454 | | let size = data.len() as i64; |
455 | | |
456 | | // Create document |
457 | | let doc = doc! { |
458 | | KEY_FIELD: encoded_key.as_ref(), |
459 | | DATA_FIELD: Bson::Binary(mongodb::bson::Binary { |
460 | | subtype: mongodb::bson::spec::BinarySubtype::Generic, |
461 | | bytes: data, |
462 | | }), |
463 | | SIZE_FIELD: size, |
464 | | }; |
465 | | |
466 | | // Upsert the document |
467 | | self.cas_collection |
468 | | .update_one( |
469 | | doc! { KEY_FIELD: encoded_key.as_ref() }, |
470 | | doc! { "$set": doc }, |
471 | | ) |
472 | | .upsert(true) |
473 | | .await |
474 | 0 | .map_err(|e| make_err!(Code::Internal, "Failed to update document in MongoDB: {e}"))?; |
475 | | |
476 | | Ok(()) |
477 | 0 | } |
478 | | |
479 | | async fn get_part( |
480 | | self: Pin<&Self>, |
481 | | key: StoreKey<'_>, |
482 | | writer: &mut DropCloserWriteHalf, |
483 | | offset: u64, |
484 | | length: Option<u64>, |
485 | 0 | ) -> Result<(), Error> { |
486 | | // Handle zero digest |
487 | | if is_zero_digest(key.borrow()) { |
488 | 0 | return writer.send_eof().map_err(|e| { |
489 | 0 | make_err!( |
490 | 0 | Code::Internal, |
491 | | "Failed to send zero EOF in mongo store get_part: {e}" |
492 | | ) |
493 | 0 | }); |
494 | | } |
495 | | |
496 | | let encoded_key = self.encode_key(&key); |
497 | | let filter = doc! { KEY_FIELD: encoded_key.as_ref() }; |
498 | | |
499 | | let doc = self |
500 | | .cas_collection |
501 | | .find_one(filter) |
502 | | .await |
503 | 0 | .map_err(|e| make_err!(Code::Internal, "Failed to find document in get_part: {e}"))? |
504 | 0 | .ok_or_else(|| { |
505 | 0 | make_err!( |
506 | 0 | Code::NotFound, |
507 | | "Data not found in MongoDB store for digest: {key:?}" |
508 | | ) |
509 | 0 | })?; |
510 | | |
511 | | let data = match doc.get(DATA_FIELD) { |
512 | | Some(Bson::Binary(binary)) => &binary.bytes, |
513 | | _ => { |
514 | | return Err(make_err!( |
515 | | Code::Internal, |
516 | | "Invalid data field in MongoDB document" |
517 | | )); |
518 | | } |
519 | | }; |
520 | | |
521 | | let offset = usize::try_from(offset).unwrap_or(usize::MAX); |
522 | | let data_len = data.len(); |
523 | | |
524 | | if offset > data_len { |
525 | 0 | return writer.send_eof().map_err(|e| { |
526 | 0 | make_err!( |
527 | 0 | Code::Internal, |
528 | | "Failed to send EOF in mongo store get_part: {e}" |
529 | | ) |
530 | 0 | }); |
531 | | } |
532 | | |
533 | | let end = if let Some(len) = length { |
534 | | cmp::min( |
535 | | offset.saturating_add(usize::try_from(len).unwrap_or(usize::MAX)), |
536 | | data_len, |
537 | | ) |
538 | | } else { |
539 | | data_len |
540 | | }; |
541 | | |
542 | | if offset < end { |
543 | | let chunk = &data[offset..end]; |
544 | | |
545 | | // Send data in chunks |
546 | | for chunk_data in chunk.chunks(self.read_chunk_size) { |
547 | 0 | writer.send(chunk_data.to_vec().into()).await.map_err(|e| { |
548 | 0 | make_err!( |
549 | 0 | Code::Internal, |
550 | | "Failed to write data in ExperimentalMongoStore::get_part: {e}" |
551 | | ) |
552 | 0 | })?; |
553 | | } |
554 | | } |
555 | | |
556 | 0 | writer.send_eof().map_err(|e| { |
557 | 0 | make_err!( |
558 | 0 | Code::Internal, |
559 | | "Failed to write EOF in mongo store get_part: {e}" |
560 | | ) |
561 | 0 | }) |
562 | 0 | } |
563 | | |
564 | 0 | fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { |
565 | 0 | self |
566 | 0 | } |
567 | | |
568 | 0 | fn as_any(&self) -> &(dyn core::any::Any + Sync + Send) { |
569 | 0 | self |
570 | 0 | } |
571 | | |
572 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send> { |
573 | 0 | self |
574 | 0 | } |
575 | | |
576 | 0 | fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) { |
577 | 0 | registry.register_indicator(self); |
578 | 0 | } |
579 | | |
580 | 0 | fn register_remove_callback( |
581 | 0 | self: Arc<Self>, |
582 | 0 | _callback: Arc<dyn RemoveItemCallback>, |
583 | 0 | ) -> Result<(), Error> { |
584 | | // drop because we don't remove anything from Mongo |
585 | 0 | Ok(()) |
586 | 0 | } |
587 | | } |
588 | | |
589 | | #[async_trait] |
590 | | impl HealthStatusIndicator for ExperimentalMongoStore { |
591 | 0 | fn get_name(&self) -> &'static str { |
592 | 0 | "ExperimentalMongoStore" |
593 | 0 | } |
594 | | |
595 | 0 | async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { |
596 | | match self.database.run_command(doc! { "ping": 1 }).await { |
597 | | Ok(_) => HealthStatus::new_ok(self, "Connection healthy".into()), |
598 | | Err(e) => HealthStatus::new_failed( |
599 | | self, |
600 | | format!("{namespace} - MongoDB connection error: {e}").into(), |
601 | | ), |
602 | | } |
603 | 0 | } |
604 | | } |
605 | | |
606 | | // ------------------------------------------------------------------- |
607 | | // |
608 | | // `ExperimentalMongoDB` scheduler implementation. Likely to change |
609 | | // ------------------------------------------------------------------- |
610 | | |
611 | | /// An individual subscription to a key in `ExperimentalMongoDB`. |
612 | | #[derive(Debug)] |
613 | | pub struct ExperimentalMongoSubscription { |
614 | | receiver: Option<watch::Receiver<String>>, |
615 | | weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<ExperimentalMongoSubscriptionPublisher>>>, |
616 | | } |
617 | | |
618 | | impl SchedulerSubscription for ExperimentalMongoSubscription { |
619 | 0 | async fn changed(&mut self) -> Result<(), Error> { |
620 | 0 | let receiver = self.receiver.as_mut().ok_or_else(|| { |
621 | 0 | make_err!( |
622 | 0 | Code::Internal, |
623 | | "In ExperimentalMongoSubscription::changed::as_mut" |
624 | | ) |
625 | 0 | })?; |
626 | 0 | receiver.changed().await.map_err(|_| { |
627 | 0 | make_err!( |
628 | 0 | Code::Internal, |
629 | | "In ExperimentalMongoSubscription::changed::changed" |
630 | | ) |
631 | 0 | }) |
632 | 0 | } |
633 | | } |
634 | | |
635 | | impl Drop for ExperimentalMongoSubscription { |
636 | 0 | fn drop(&mut self) { |
637 | 0 | let Some(receiver) = self.receiver.take() else { Branch (637:13): [True: 0, False: 0]
Branch (637:13): [Folded - Ignored]
|
638 | 0 | warn!("ExperimentalMongoSubscription has already been dropped, nothing to do."); |
639 | 0 | return; |
640 | | }; |
641 | 0 | let key = receiver.borrow().clone(); |
642 | 0 | drop(receiver); |
643 | | |
644 | 0 | let Some(subscribed_keys) = self.weak_subscribed_keys.upgrade() else { Branch (644:13): [True: 0, False: 0]
Branch (644:13): [Folded - Ignored]
|
645 | 0 | return; |
646 | | }; |
647 | 0 | let mut subscribed_keys = subscribed_keys.write(); |
648 | 0 | let Some(value) = subscribed_keys.get(&key) else { Branch (648:13): [True: 0, False: 0]
Branch (648:13): [Folded - Ignored]
|
649 | 0 | error!( |
650 | 0 | "Key {key} was not found in subscribed keys when checking if it should be removed." |
651 | | ); |
652 | 0 | return; |
653 | | }; |
654 | | |
655 | 0 | if value.receiver_count() == 0 { Branch (655:12): [True: 0, False: 0]
Branch (655:12): [Folded - Ignored]
|
656 | 0 | subscribed_keys.remove(key); |
657 | 0 | } |
658 | 0 | } |
659 | | } |
660 | | |
661 | | /// A publisher for a key in `ExperimentalMongoDB`. |
662 | | #[derive(Debug)] |
663 | | struct ExperimentalMongoSubscriptionPublisher { |
664 | | sender: Mutex<watch::Sender<String>>, |
665 | | } |
666 | | |
667 | | impl ExperimentalMongoSubscriptionPublisher { |
668 | 0 | fn new( |
669 | 0 | key: String, |
670 | 0 | weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>, |
671 | 0 | ) -> (Self, ExperimentalMongoSubscription) { |
672 | 0 | let (sender, receiver) = watch::channel(key); |
673 | 0 | let publisher = Self { |
674 | 0 | sender: Mutex::new(sender), |
675 | 0 | }; |
676 | 0 | let subscription = ExperimentalMongoSubscription { |
677 | 0 | receiver: Some(receiver), |
678 | 0 | weak_subscribed_keys, |
679 | 0 | }; |
680 | 0 | (publisher, subscription) |
681 | 0 | } |
682 | | |
683 | 0 | fn subscribe( |
684 | 0 | &self, |
685 | 0 | weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>, |
686 | 0 | ) -> ExperimentalMongoSubscription { |
687 | 0 | let receiver = self.sender.lock().subscribe(); |
688 | 0 | ExperimentalMongoSubscription { |
689 | 0 | receiver: Some(receiver), |
690 | 0 | weak_subscribed_keys, |
691 | 0 | } |
692 | 0 | } |
693 | | |
694 | 0 | fn receiver_count(&self) -> usize { |
695 | 0 | self.sender.lock().receiver_count() |
696 | 0 | } |
697 | | |
698 | 0 | fn notify(&self) { |
699 | 0 | self.sender.lock().send_modify(|_| {}); |
700 | 0 | } |
701 | | } |
702 | | |
703 | | #[derive(Debug)] |
704 | | pub struct ExperimentalMongoSubscriptionManager { |
705 | | subscribed_keys: Arc<RwLock<StringPatriciaMap<ExperimentalMongoSubscriptionPublisher>>>, |
706 | | _subscription_spawn: JoinHandleDropGuard<()>, |
707 | | } |
708 | | |
709 | | impl ExperimentalMongoSubscriptionManager { |
710 | 0 | pub fn new(database: Database, collection_name: String, key_prefix: String) -> Self { |
711 | 0 | let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new())); |
712 | 0 | let subscribed_keys_weak = Arc::downgrade(&subscribed_keys); |
713 | | |
714 | | Self { |
715 | 0 | subscribed_keys, |
716 | 0 | _subscription_spawn: spawn!("mongo_subscribe_spawn", async move { |
717 | 0 | let collection = database.collection::<Document>(&collection_name); |
718 | | |
719 | | loop { |
720 | | // Try to create change stream |
721 | 0 | let change_stream_result = collection.watch().await; |
722 | | |
723 | 0 | match change_stream_result { |
724 | 0 | Ok(mut change_stream) => { |
725 | 0 | info!("MongoDB change stream connected"); |
726 | | |
727 | 0 | while let Some(event_result) = change_stream.next().await { Branch (727:39): [True: 0, False: 0]
Branch (727:39): [Folded - Ignored]
|
728 | 0 | match event_result { |
729 | 0 | Ok(event) => { |
730 | | use mongodb::change_stream::event::OperationType; |
731 | 0 | match event.operation_type { |
732 | | OperationType::Insert |
733 | | | OperationType::Update |
734 | | | OperationType::Replace |
735 | | | OperationType::Delete => { |
736 | 0 | if let Some(doc_key) = event.document_key { Branch (736:56): [True: 0, False: 0]
Branch (736:56): [Folded - Ignored]
|
737 | 0 | if let Ok(key) = doc_key.get_str(KEY_FIELD) { Branch (737:60): [True: 0, False: 0]
Branch (737:60): [Folded - Ignored]
|
738 | | // Remove prefix if present |
739 | 0 | let key = if key_prefix.is_empty() { Branch (739:70): [True: 0, False: 0]
Branch (739:70): [Folded - Ignored]
|
740 | 0 | key |
741 | | } else { |
742 | 0 | key.strip_prefix(&key_prefix) |
743 | 0 | .unwrap_or(key) |
744 | | }; |
745 | | |
746 | 0 | let Some(subscribed_keys) = Branch (746:61): [True: 0, False: 0]
Branch (746:61): [Folded - Ignored]
|
747 | 0 | subscribed_keys_weak.upgrade() |
748 | | else { |
749 | 0 | warn!( |
750 | 0 | "Parent dropped, exiting ExperimentalMongoSubscriptionManager" |
751 | | ); |
752 | 0 | return; |
753 | | }; |
754 | | |
755 | 0 | let subscribed_keys_mux = |
756 | 0 | subscribed_keys.read(); |
757 | 0 | subscribed_keys_mux |
758 | 0 | .common_prefix_values(key) |
759 | 0 | .for_each(ExperimentalMongoSubscriptionPublisher::notify); |
760 | 0 | } |
761 | 0 | } |
762 | | } |
763 | 0 | _ => {} |
764 | | } |
765 | | } |
766 | 0 | Err(e) => { |
767 | 0 | error!("Error in change stream: {e}"); |
768 | 0 | break; |
769 | | } |
770 | | } |
771 | | } |
772 | | } |
773 | 0 | Err(e) => { |
774 | 0 | warn!("Failed to create change stream: {e}. Will retry in 5 seconds."); |
775 | | } |
776 | | } |
777 | | |
778 | | // Check if parent is still alive |
779 | 0 | if subscribed_keys_weak.upgrade().is_none() { Branch (779:24): [True: 0, False: 0]
Branch (779:24): [Folded - Ignored]
|
780 | 0 | warn!("Parent dropped, exiting ExperimentalMongoSubscriptionManager"); |
781 | 0 | return; |
782 | 0 | } |
783 | | |
784 | | // Sleep before retry |
785 | 0 | sleep(Duration::from_secs(5)).await; |
786 | | |
787 | | // Notify all subscribers on reconnection |
788 | 0 | if let Some(subscribed_keys) = subscribed_keys_weak.upgrade() { Branch (788:28): [True: 0, False: 0]
Branch (788:28): [Folded - Ignored]
|
789 | 0 | let subscribed_keys_mux = subscribed_keys.read(); |
790 | 0 | for publisher in subscribed_keys_mux.values() { |
791 | 0 | publisher.notify(); |
792 | 0 | } |
793 | 0 | } |
794 | | } |
795 | 0 | }), |
796 | | } |
797 | 0 | } |
798 | | } |
799 | | |
800 | | impl SchedulerSubscriptionManager for ExperimentalMongoSubscriptionManager { |
801 | | type Subscription = ExperimentalMongoSubscription; |
802 | | |
803 | 0 | fn notify_for_test(&self, value: String) { |
804 | 0 | let subscribed_keys_mux = self.subscribed_keys.read(); |
805 | 0 | subscribed_keys_mux |
806 | 0 | .common_prefix_values(&value) |
807 | 0 | .for_each(ExperimentalMongoSubscriptionPublisher::notify); |
808 | 0 | } |
809 | | |
810 | 0 | fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error> |
811 | 0 | where |
812 | 0 | K: SchedulerStoreKeyProvider, |
813 | | { |
814 | 0 | let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys); |
815 | 0 | let mut subscribed_keys = self.subscribed_keys.write(); |
816 | 0 | let key = key.get_key(); |
817 | 0 | let key_str = key.as_str(); |
818 | | |
819 | 0 | let mut subscription = if let Some(publisher) = subscribed_keys.get(&key_str) { Branch (819:39): [Folded - Ignored]
Branch (819:39): [Folded - Ignored]
|
820 | 0 | publisher.subscribe(weak_subscribed_keys) |
821 | | } else { |
822 | 0 | let (publisher, subscription) = ExperimentalMongoSubscriptionPublisher::new( |
823 | 0 | key_str.to_string(), |
824 | 0 | weak_subscribed_keys, |
825 | 0 | ); |
826 | 0 | subscribed_keys.insert(key_str, publisher); |
827 | 0 | subscription |
828 | | }; |
829 | | |
830 | 0 | subscription |
831 | 0 | .receiver |
832 | 0 | .as_mut() |
833 | 0 | .ok_or_else(|| { |
834 | 0 | make_err!( |
835 | 0 | Code::Internal, |
836 | | "Receiver should be set in ExperimentalMongoSubscriptionManager::subscribe" |
837 | | ) |
838 | 0 | })? |
839 | 0 | .mark_changed(); |
840 | | |
841 | 0 | Ok(subscription) |
842 | 0 | } |
843 | | |
844 | 0 | fn is_reliable() -> bool { |
845 | 0 | true |
846 | 0 | } |
847 | | } |
848 | | |
849 | | impl SchedulerStore for ExperimentalMongoStore { |
850 | | type SubscriptionManager = ExperimentalMongoSubscriptionManager; |
851 | | |
852 | 0 | fn subscription_manager(&self) -> Result<Arc<ExperimentalMongoSubscriptionManager>, Error> { |
853 | 0 | let mut subscription_manager = self.subscription_manager.lock(); |
854 | 0 | if let Some(subscription_manager) = &*subscription_manager { Branch (854:16): [True: 0, False: 0]
Branch (854:16): [Folded - Ignored]
|
855 | 0 | Ok(subscription_manager.clone()) |
856 | | } else { |
857 | 0 | if !self.enable_change_streams { Branch (857:16): [True: 0, False: 0]
Branch (857:16): [Folded - Ignored]
|
858 | 0 | return Err(make_input_err!( |
859 | 0 | "ExperimentalMongoStore must have change streams enabled for scheduler subscriptions" |
860 | 0 | )); |
861 | 0 | } |
862 | | |
863 | 0 | let sub = Arc::new(ExperimentalMongoSubscriptionManager::new( |
864 | 0 | self.database.clone(), |
865 | 0 | self.scheduler_collection.name().to_string(), |
866 | 0 | self.key_prefix.clone(), |
867 | | )); |
868 | 0 | *subscription_manager = Some(sub.clone()); |
869 | 0 | Ok(sub) |
870 | | } |
871 | 0 | } |
872 | | |
873 | 0 | async fn update_data<T>(&self, data: T) -> Result<Option<i64>, Error> |
874 | 0 | where |
875 | 0 | T: SchedulerStoreDataProvider |
876 | 0 | + SchedulerStoreKeyProvider |
877 | 0 | + SchedulerCurrentVersionProvider |
878 | 0 | + Send, |
879 | 0 | { |
880 | 0 | let key = data.get_key(); |
881 | 0 | let encoded_key = self.encode_key(&key); |
882 | 0 | let maybe_index = data.get_indexes().map_err(|e| { |
883 | 0 | make_err!( |
884 | 0 | Code::Internal, |
885 | | "Error getting indexes in ExperimentalMongoStore::update_data: {e}" |
886 | | ) |
887 | 0 | })?; |
888 | | |
889 | 0 | if <T as SchedulerStoreKeyProvider>::Versioned::VALUE { Branch (889:12): [Folded - Ignored]
Branch (889:12): [True: 0, False: 0]
Branch (889:12): [Folded - Ignored]
|
890 | 0 | let current_version = data.current_version(); |
891 | 0 | let data_bytes = data.try_into_bytes().map_err(|e| { |
892 | 0 | make_err!( |
893 | 0 | Code::Internal, |
894 | | "Could not convert value to bytes in ExperimentalMongoStore::update_data: {e}" |
895 | | ) |
896 | 0 | })?; |
897 | | |
898 | 0 | let mut update_doc = doc! { |
899 | | "$set": { |
900 | 0 | DATA_FIELD: Bson::Binary(mongodb::bson::Binary { |
901 | 0 | subtype: mongodb::bson::spec::BinarySubtype::Generic, |
902 | 0 | bytes: data_bytes.to_vec(), |
903 | 0 | }), |
904 | | }, |
905 | | "$inc": { |
906 | | VERSION_FIELD: 1i64, |
907 | | } |
908 | | }; |
909 | | |
910 | | // Add indexes |
911 | 0 | for (name, value) in maybe_index { |
912 | 0 | update_doc.get_document_mut("$set").unwrap().insert( |
913 | 0 | name, |
914 | 0 | Bson::Binary(mongodb::bson::Binary { |
915 | 0 | subtype: mongodb::bson::spec::BinarySubtype::Generic, |
916 | 0 | bytes: value.to_vec(), |
917 | 0 | }), |
918 | 0 | ); |
919 | 0 | } |
920 | | |
921 | 0 | let filter = doc! { |
922 | 0 | KEY_FIELD: encoded_key.as_ref(), |
923 | 0 | VERSION_FIELD: current_version, |
924 | | }; |
925 | | |
926 | 0 | match self |
927 | 0 | .scheduler_collection |
928 | 0 | .find_one_and_update(filter, update_doc) |
929 | 0 | .upsert(true) |
930 | 0 | .return_document(ReturnDocument::After) |
931 | 0 | .await |
932 | | { |
933 | 0 | Ok(Some(doc)) => Ok(doc.get_i64(VERSION_FIELD).ok().or(Some(1i64))), |
934 | 0 | Ok(None) => Ok(None), |
935 | 0 | Err(e) => Err(make_err!( |
936 | 0 | Code::Internal, |
937 | 0 | "MongoDB error in update_data: {e}" |
938 | 0 | )), |
939 | | } |
940 | | } else { |
941 | 0 | let data_bytes = data.try_into_bytes().map_err(|e| { |
942 | 0 | make_err!( |
943 | 0 | Code::Internal, |
944 | | "Could not convert value to bytes in ExperimentalMongoStore::update_data: {e}" |
945 | | ) |
946 | 0 | })?; |
947 | | |
948 | 0 | let mut doc = doc! { |
949 | 0 | KEY_FIELD: encoded_key.as_ref(), |
950 | 0 | DATA_FIELD: Bson::Binary(mongodb::bson::Binary { |
951 | 0 | subtype: mongodb::bson::spec::BinarySubtype::Generic, |
952 | 0 | bytes: data_bytes.to_vec(), |
953 | 0 | }), |
954 | | }; |
955 | | |
956 | | // Add indexes |
957 | 0 | for (name, value) in maybe_index { |
958 | 0 | doc.insert( |
959 | 0 | name, |
960 | 0 | Bson::Binary(mongodb::bson::Binary { |
961 | 0 | subtype: mongodb::bson::spec::BinarySubtype::Generic, |
962 | 0 | bytes: value.to_vec(), |
963 | 0 | }), |
964 | 0 | ); |
965 | 0 | } |
966 | | |
967 | 0 | self.scheduler_collection |
968 | 0 | .update_one( |
969 | 0 | doc! { KEY_FIELD: encoded_key.as_ref() }, |
970 | 0 | doc! { "$set": doc }, |
971 | 0 | ) |
972 | 0 | .upsert(true) |
973 | 0 | .await |
974 | 0 | .map_err(|e| { |
975 | 0 | make_err!(Code::Internal, "Failed to update scheduler document: {e}") |
976 | 0 | })?; |
977 | | |
978 | 0 | Ok(Some(0)) |
979 | | } |
980 | 0 | } |
981 | | |
982 | 0 | async fn search_by_index_prefix<K>( |
983 | 0 | &self, |
984 | 0 | index: K, |
985 | 0 | ) -> Result< |
986 | 0 | impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send, |
987 | 0 | Error, |
988 | 0 | > |
989 | 0 | where |
990 | 0 | K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send, |
991 | 0 | { |
992 | 0 | let index_value = index.index_value(); |
993 | | |
994 | | // Create index if it doesn't exist |
995 | 0 | let index_name = format!("{}_{}", K::KEY_PREFIX, K::INDEX_NAME); |
996 | 0 | self.scheduler_collection |
997 | 0 | .create_index( |
998 | 0 | IndexModel::builder() |
999 | 0 | .keys(doc! { K::INDEX_NAME: 1 }) |
1000 | 0 | .options(IndexOptions::builder().name(index_name).build()) |
1001 | 0 | .build(), |
1002 | 0 | ) |
1003 | 0 | .await |
1004 | 0 | .map_err(|e| make_err!(Code::Internal, "Failed to create scheduler index: {e}"))?; |
1005 | | |
1006 | | // Build filter |
1007 | 0 | let filter = doc! { |
1008 | | K::INDEX_NAME: { |
1009 | 0 | "$regex": format!("^{}", regex::escape(index_value.as_ref())), |
1010 | | } |
1011 | | }; |
1012 | | |
1013 | | // Add sort if specified |
1014 | 0 | let find_options = if let Some(sort_key) = K::MAYBE_SORT_KEY { Branch (1014:35): [Folded - Ignored]
Branch (1014:35): [True: 0, False: 0]
Branch (1014:35): [True: 0, False: 0]
Branch (1014:35): [Folded - Ignored]
|
1015 | 0 | FindOptions::builder().sort(doc! { sort_key: 1 }).build() |
1016 | | } else { |
1017 | 0 | FindOptions::default() |
1018 | | }; |
1019 | | |
1020 | 0 | let cursor = self |
1021 | 0 | .scheduler_collection |
1022 | 0 | .find(filter) |
1023 | 0 | .with_options(find_options) |
1024 | 0 | .await |
1025 | 0 | .map_err(|e| { |
1026 | 0 | make_err!( |
1027 | 0 | Code::Internal, |
1028 | | "Failed to create cursor in search_by_index_prefix: {e}" |
1029 | | ) |
1030 | 0 | })?; |
1031 | | |
1032 | 0 | Ok(cursor.map(move |result| { |
1033 | 0 | let doc = result.map_err(|e| { |
1034 | 0 | make_err!( |
1035 | 0 | Code::Internal, |
1036 | | "Error reading document in search_by_index_prefix: {e}" |
1037 | | ) |
1038 | 0 | })?; |
1039 | | |
1040 | 0 | let data = match doc.get(DATA_FIELD) { |
1041 | 0 | Some(Bson::Binary(binary)) => Bytes::from(binary.bytes.clone()), |
1042 | | _ => { |
1043 | 0 | return Err(make_err!( |
1044 | 0 | Code::Internal, |
1045 | 0 | "Missing or invalid data field in search_by_index_prefix" |
1046 | 0 | )); |
1047 | | } |
1048 | | }; |
1049 | | |
1050 | 0 | let version = if <K as SchedulerIndexProvider>::Versioned::VALUE { Branch (1050:30): [Folded - Ignored]
Branch (1050:30): [True: 0, False: 0]
Branch (1050:30): [True: 0, False: 0]
Branch (1050:30): [Folded - Ignored]
|
1051 | 0 | doc.get_i64(VERSION_FIELD).unwrap_or(0) |
1052 | | } else { |
1053 | 0 | 0 |
1054 | | }; |
1055 | | |
1056 | 0 | K::decode(version, data).map_err(|e| { |
1057 | 0 | make_err!( |
1058 | 0 | Code::Internal, |
1059 | | "Failed to decode in search_by_index_prefix: {e}" |
1060 | | ) |
1061 | 0 | }) |
1062 | 0 | })) |
1063 | 0 | } |
1064 | | |
1065 | 0 | async fn get_and_decode<K>( |
1066 | 0 | &self, |
1067 | 0 | key: K, |
1068 | 0 | ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error> |
1069 | 0 | where |
1070 | 0 | K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send, |
1071 | 0 | { |
1072 | 0 | let key = key.get_key(); |
1073 | 0 | let encoded_key = self.encode_key(&key); |
1074 | 0 | let filter = doc! { KEY_FIELD: encoded_key.as_ref() }; |
1075 | | |
1076 | 0 | let doc = self |
1077 | 0 | .scheduler_collection |
1078 | 0 | .find_one(filter) |
1079 | 0 | .await |
1080 | 0 | .map_err(|e| { |
1081 | 0 | make_err!( |
1082 | 0 | Code::Internal, |
1083 | | "Failed to find document in get_and_decode: {e}" |
1084 | | ) |
1085 | 0 | })?; |
1086 | | |
1087 | 0 | let Some(doc) = doc else { Branch (1087:13): [Folded - Ignored]
Branch (1087:13): [True: 0, False: 0]
Branch (1087:13): [Folded - Ignored]
|
1088 | 0 | return Ok(None); |
1089 | | }; |
1090 | | |
1091 | 0 | let data = match doc.get(DATA_FIELD) { |
1092 | 0 | Some(Bson::Binary(binary)) => Bytes::from(binary.bytes.clone()), |
1093 | 0 | _ => return Ok(None), |
1094 | | }; |
1095 | | |
1096 | 0 | let version = if <K as SchedulerStoreKeyProvider>::Versioned::VALUE { Branch (1096:26): [Folded - Ignored]
Branch (1096:26): [True: 0, False: 0]
Branch (1096:26): [Folded - Ignored]
|
1097 | 0 | doc.get_i64(VERSION_FIELD).unwrap_or(0) |
1098 | | } else { |
1099 | 0 | 0 |
1100 | | }; |
1101 | | |
1102 | 0 | Ok(Some(K::decode(version, data).map_err(|e| { |
1103 | 0 | make_err!(Code::Internal, "Failed to decode in get_and_decode: {e}") |
1104 | 0 | })?)) |
1105 | 0 | } |
1106 | | } |