/build/source/nativelink-store/src/mongo_store.rs
Line | Count | Source |
1 | | // Copyright 2025 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::cmp; |
16 | | use core::ops::Bound; |
17 | | use core::pin::Pin; |
18 | | use core::time::Duration; |
19 | | use std::borrow::Cow; |
20 | | use std::sync::{Arc, Weak}; |
21 | | |
22 | | use async_trait::async_trait; |
23 | | use bytes::Bytes; |
24 | | use futures::stream::{Stream, StreamExt, TryStreamExt}; |
25 | | use mongodb::bson::{Bson, Document, doc}; |
26 | | use mongodb::options::{ClientOptions, FindOptions, IndexOptions, ReturnDocument, WriteConcern}; |
27 | | use mongodb::{Client as MongoClient, Collection, Database, IndexModel}; |
28 | | use nativelink_config::stores::ExperimentalMongoSpec; |
29 | | use nativelink_error::{Code, Error, make_err, make_input_err}; |
30 | | use nativelink_metric::MetricsComponent; |
31 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
32 | | use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; |
33 | | use nativelink_util::spawn; |
34 | | use nativelink_util::store_trait::{ |
35 | | BoolValue, SchedulerCurrentVersionProvider, SchedulerIndexProvider, SchedulerStore, |
36 | | SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider, |
37 | | SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo, |
38 | | }; |
39 | | use nativelink_util::task::JoinHandleDropGuard; |
40 | | use parking_lot::{Mutex, RwLock}; |
41 | | use patricia_tree::StringPatriciaMap; |
42 | | use tokio::sync::watch; |
43 | | use tokio::time::sleep; |
44 | | use tracing::{error, info, warn}; |
45 | | |
46 | | use crate::cas_utils::is_zero_digest; |
47 | | |
48 | | /// The default database name if not specified. |
49 | | const DEFAULT_DB_NAME: &str = "nativelink"; |
50 | | |
51 | | /// The default collection name if not specified. |
52 | | const DEFAULT_COLLECTION_NAME: &str = "cas"; |
53 | | |
54 | | /// The default scheduler collection name if not specified. |
55 | | const DEFAULT_SCHEDULER_COLLECTION_NAME: &str = "scheduler"; |
56 | | |
57 | | /// The default size of the read chunk when reading data from `MongoDB`. |
58 | | const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024; |
59 | | |
60 | | /// The default connection timeout in milliseconds if not specified. |
61 | | const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000; |
62 | | |
63 | | /// The default command timeout in milliseconds if not specified. |
64 | | const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000; |
65 | | |
66 | | /// The default maximum number of concurrent uploads. |
67 | | const DEFAULT_MAX_CONCURRENT_UPLOADS: usize = 10; |
68 | | |
69 | | /// The name of the field in `MongoDB` documents that stores the key. |
70 | | const KEY_FIELD: &str = "_id"; |
71 | | |
72 | | /// The name of the field in `MongoDB` documents that stores the data. |
73 | | const DATA_FIELD: &str = "data"; |
74 | | |
75 | | /// The name of the field in `MongoDB` documents that stores the version. |
76 | | const VERSION_FIELD: &str = "version"; |
77 | | |
78 | | /// The name of the field in `MongoDB` documents that stores the size. |
79 | | const SIZE_FIELD: &str = "size"; |
80 | | |
81 | | /// A [`StoreDriver`] implementation that uses `MongoDB` as a backing store. |
82 | | #[derive(Debug, MetricsComponent)] |
83 | | pub struct ExperimentalMongoStore { |
84 | | /// The `MongoDB` client. |
85 | | #[allow(dead_code)] |
86 | | client: MongoClient, |
87 | | |
88 | | /// The database to use. |
89 | | database: Database, |
90 | | |
91 | | /// The collection for CAS data. |
92 | | cas_collection: Collection<Document>, |
93 | | |
94 | | /// The collection for scheduler data. |
95 | | scheduler_collection: Collection<Document>, |
96 | | |
97 | | /// A common prefix to append to all keys before they are sent to `MongoDB`. |
98 | | #[metric(help = "Prefix to append to all keys before sending to MongoDB")] |
99 | | key_prefix: String, |
100 | | |
101 | | /// The amount of data to read from `MongoDB` at a time. |
102 | | #[metric(help = "The amount of data to read from MongoDB at a time")] |
103 | | read_chunk_size: usize, |
104 | | |
105 | | /// The maximum number of concurrent uploads. |
106 | | #[metric(help = "The maximum number of concurrent uploads")] |
107 | | max_concurrent_uploads: usize, |
108 | | |
109 | | /// Enable change streams for real-time updates. |
110 | | #[metric(help = "Whether change streams are enabled")] |
111 | | enable_change_streams: bool, |
112 | | |
113 | | /// A manager for subscriptions to keys in `MongoDB`. |
114 | | subscription_manager: Mutex<Option<Arc<ExperimentalMongoSubscriptionManager>>>, |
115 | | } |
116 | | |
117 | | impl ExperimentalMongoStore { |
118 | | /// Create a new `ExperimentalMongoStore` from the given configuration. |
119 | 3 | pub async fn new(mut spec: ExperimentalMongoSpec) -> Result<Arc<Self>, Error> { |
120 | | // Set defaults |
121 | 3 | if spec.connection_string.is_empty() { Branch (121:12): [True: 0, False: 0]
Branch (121:12): [True: 1, False: 2]
Branch (121:12): [Folded - Ignored]
|
122 | 1 | return Err(make_err!( |
123 | 1 | Code::InvalidArgument, |
124 | 1 | "No connection string was specified in mongo store configuration." |
125 | 1 | )); |
126 | 2 | } |
127 | | |
128 | 2 | if spec.database.is_empty() { Branch (128:12): [True: 0, False: 0]
Branch (128:12): [True: 0, False: 2]
Branch (128:12): [Folded - Ignored]
|
129 | 0 | spec.database = DEFAULT_DB_NAME.to_string(); |
130 | 2 | } |
131 | | |
132 | 2 | if spec.cas_collection.is_empty() { Branch (132:12): [True: 0, False: 0]
Branch (132:12): [True: 0, False: 2]
Branch (132:12): [Folded - Ignored]
|
133 | 0 | spec.cas_collection = DEFAULT_COLLECTION_NAME.to_string(); |
134 | 2 | } |
135 | | |
136 | 2 | if spec.scheduler_collection.is_empty() { Branch (136:12): [True: 0, False: 0]
Branch (136:12): [True: 0, False: 2]
Branch (136:12): [Folded - Ignored]
|
137 | 0 | spec.scheduler_collection = DEFAULT_SCHEDULER_COLLECTION_NAME.to_string(); |
138 | 2 | } |
139 | | |
140 | 2 | if spec.read_chunk_size == 0 { Branch (140:12): [True: 0, False: 0]
Branch (140:12): [True: 0, False: 2]
Branch (140:12): [Folded - Ignored]
|
141 | 0 | spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE; |
142 | 2 | } |
143 | | |
144 | 2 | if spec.connection_timeout_ms == 0 { Branch (144:12): [True: 0, False: 0]
Branch (144:12): [True: 0, False: 2]
Branch (144:12): [Folded - Ignored]
|
145 | 0 | spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS; |
146 | 2 | } |
147 | | |
148 | 2 | if spec.command_timeout_ms == 0 { Branch (148:12): [True: 0, False: 0]
Branch (148:12): [True: 0, False: 2]
Branch (148:12): [Folded - Ignored]
|
149 | 0 | spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS; |
150 | 2 | } |
151 | | |
152 | 2 | if spec.max_concurrent_uploads == 0 { Branch (152:12): [True: 0, False: 0]
Branch (152:12): [True: 0, False: 2]
Branch (152:12): [Folded - Ignored]
|
153 | 0 | spec.max_concurrent_uploads = DEFAULT_MAX_CONCURRENT_UPLOADS; |
154 | 2 | } |
155 | | |
156 | | // Configure client options |
157 | 2 | let mut client_options1 = ClientOptions::parse(&spec.connection_string) |
158 | 2 | .await |
159 | 2 | .map_err(|e| {1 |
160 | 1 | make_err!( |
161 | 1 | Code::InvalidArgument, |
162 | | "Failed to parse MongoDB connection string: {e}" |
163 | | ) |
164 | 1 | })?; |
165 | | |
166 | 1 | client_options.server_selection_timeout = |
167 | 1 | Some(Duration::from_millis(spec.connection_timeout_ms)); |
168 | 1 | client_options.connect_timeout = Some(Duration::from_millis(spec.connection_timeout_ms)); |
169 | | |
170 | | // Set write concern if specified |
171 | 1 | if let Some(w0 ) = spec.write_concern_w { Branch (171:16): [True: 0, False: 0]
Branch (171:16): [True: 0, False: 1]
Branch (171:16): [Folded - Ignored]
|
172 | | // Parse write concern w - can be a number or string like "majority" |
173 | 0 | let w_value = if let Ok(num) = w.parse::<u32>() { Branch (173:34): [True: 0, False: 0]
Branch (173:34): [True: 0, False: 0]
Branch (173:34): [Folded - Ignored]
|
174 | 0 | Some(num.into()) |
175 | | } else { |
176 | 0 | Some(w.into()) |
177 | | }; |
178 | | |
179 | | // Build write concern based on which options are set |
180 | 0 | let write_concern = match (w_value, spec.write_concern_j, spec.write_concern_timeout_ms) |
181 | | { |
182 | 0 | (Some(w), Some(j), Some(timeout)) => WriteConcern::builder() |
183 | 0 | .w(Some(w)) |
184 | 0 | .journal(j) |
185 | 0 | .w_timeout(Some(Duration::from_millis(u64::from(timeout)))) |
186 | 0 | .build(), |
187 | 0 | (Some(w), Some(j), None) => WriteConcern::builder().w(Some(w)).journal(j).build(), |
188 | 0 | (Some(w), None, Some(timeout)) => WriteConcern::builder() |
189 | 0 | .w(Some(w)) |
190 | 0 | .w_timeout(Some(Duration::from_millis(u64::from(timeout)))) |
191 | 0 | .build(), |
192 | 0 | (Some(w), None, None) => WriteConcern::builder().w(Some(w)).build(), |
193 | 0 | _ => unreachable!(), // We know w is Some because we're in the if let Some(w) block |
194 | | }; |
195 | | |
196 | 0 | client_options.write_concern = Some(write_concern); |
197 | 1 | } else if spec.write_concern_j.is_some() || spec.write_concern_timeout_ms0 .is_some0 () { Branch (197:19): [True: 0, False: 0]
Branch (197:53): [True: 0, False: 0]
Branch (197:19): [True: 1, False: 0]
Branch (197:53): [True: 0, False: 0]
Branch (197:19): [Folded - Ignored]
Branch (197:53): [Folded - Ignored]
|
198 | 1 | return Err(make_err!( |
199 | 1 | Code::InvalidArgument, |
200 | 1 | "write_concern_w not set, but j and/or timeout set. Please set 'write_concern_w' to a non-default value. See https://www.mongodb.com/docs/manual/reference/write-concern/#w-option for options." |
201 | 1 | )); |
202 | 0 | } |
203 | | |
204 | | // Create client |
205 | 0 | let client = MongoClient::with_options(client_options).map_err(|e| { |
206 | 0 | make_err!( |
207 | 0 | Code::InvalidArgument, |
208 | | "Failed to create MongoDB client: {e}" |
209 | | ) |
210 | 0 | })?; |
211 | | |
212 | | // Get database and collections |
213 | 0 | let database = client.database(&spec.database); |
214 | 0 | let cas_collection = database.collection::<Document>(&spec.cas_collection); |
215 | 0 | let scheduler_collection = database.collection::<Document>(&spec.scheduler_collection); |
216 | | |
217 | | // Create indexes |
218 | 0 | Self::create_indexes(&cas_collection, &scheduler_collection).await?; |
219 | | |
220 | 0 | let store = Self { |
221 | 0 | client, |
222 | 0 | database, |
223 | 0 | cas_collection, |
224 | 0 | scheduler_collection, |
225 | 0 | key_prefix: spec.key_prefix.clone().unwrap_or_default(), |
226 | 0 | read_chunk_size: spec.read_chunk_size, |
227 | 0 | max_concurrent_uploads: spec.max_concurrent_uploads, |
228 | 0 | enable_change_streams: spec.enable_change_streams, |
229 | 0 | subscription_manager: Mutex::new(None), |
230 | 0 | }; |
231 | | |
232 | 0 | Ok(Arc::new(store)) |
233 | 3 | } |
234 | | |
235 | | /// Create necessary indexes for efficient operations. |
236 | 0 | async fn create_indexes( |
237 | 0 | cas_collection: &Collection<Document>, |
238 | 0 | _scheduler_collection: &Collection<Document>, |
239 | 0 | ) -> Result<(), Error> { |
240 | | // CAS collection indexes |
241 | 0 | cas_collection |
242 | 0 | .create_index(IndexModel::builder().keys(doc! { SIZE_FIELD: 1 }).build()) |
243 | 0 | .await |
244 | 0 | .map_err(|e| { |
245 | 0 | make_err!( |
246 | 0 | Code::Internal, |
247 | | "Failed to create size index on CAS collection: {e}" |
248 | | ) |
249 | 0 | })?; |
250 | | |
251 | | // Scheduler collection will have dynamic indexes created as needed |
252 | | |
253 | 0 | Ok(()) |
254 | 0 | } |
255 | | |
256 | | /// Encode a [`StoreKey`] so it can be sent to `MongoDB`. |
257 | 0 | fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> { |
258 | 0 | let key_body = key.as_str(); |
259 | 0 | if self.key_prefix.is_empty() { Branch (259:12): [True: 0, False: 0]
Branch (259:12): [Folded - Ignored]
|
260 | 0 | key_body |
261 | | } else { |
262 | 0 | match key_body { |
263 | 0 | Cow::Owned(mut encoded_key) => { |
264 | 0 | encoded_key.insert_str(0, &self.key_prefix); |
265 | 0 | Cow::Owned(encoded_key) |
266 | | } |
267 | 0 | Cow::Borrowed(body) => { |
268 | 0 | let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len()); |
269 | 0 | encoded_key.push_str(&self.key_prefix); |
270 | 0 | encoded_key.push_str(body); |
271 | 0 | Cow::Owned(encoded_key) |
272 | | } |
273 | | } |
274 | | } |
275 | 0 | } |
276 | | |
277 | | /// Decode a key from `MongoDB` by removing the prefix. |
278 | 0 | fn decode_key(&self, key: &str) -> Option<String> { |
279 | 0 | if self.key_prefix.is_empty() { Branch (279:12): [True: 0, False: 0]
Branch (279:12): [Folded - Ignored]
|
280 | 0 | Some(key.to_string()) |
281 | | } else { |
282 | 0 | key.strip_prefix(&self.key_prefix).map(ToString::to_string) |
283 | | } |
284 | 0 | } |
285 | | } |
286 | | |
287 | | #[async_trait] |
288 | | impl StoreDriver for ExperimentalMongoStore { |
289 | | async fn has_with_results( |
290 | | self: Pin<&Self>, |
291 | | keys: &[StoreKey<'_>], |
292 | | results: &mut [Option<u64>], |
293 | 0 | ) -> Result<(), Error> { |
294 | 0 | for (key, result) in keys.iter().zip(results.iter_mut()) { |
295 | | // Handle zero digest specially |
296 | 0 | if is_zero_digest(key.borrow()) { Branch (296:16): [True: 0, False: 0]
Branch (296:16): [Folded - Ignored]
|
297 | 0 | *result = Some(0); |
298 | 0 | continue; |
299 | 0 | } |
300 | | |
301 | 0 | let encoded_key = self.encode_key(key); |
302 | 0 | let filter = doc! { KEY_FIELD: encoded_key.as_ref() }; |
303 | | |
304 | 0 | match self.cas_collection.find_one(filter).await { |
305 | 0 | Ok(Some(doc)) => { |
306 | 0 | *result = doc.get_i64(SIZE_FIELD).ok().map(|v| v as u64); |
307 | | } |
308 | 0 | Ok(None) => { |
309 | 0 | *result = None; |
310 | 0 | } |
311 | 0 | Err(e) => { |
312 | 0 | return Err(make_err!( |
313 | 0 | Code::Internal, |
314 | 0 | "MongoDB error in has_with_results: {e}" |
315 | 0 | )); |
316 | | } |
317 | | } |
318 | | } |
319 | 0 | Ok(()) |
320 | 0 | } |
321 | | |
322 | | async fn list( |
323 | | self: Pin<&Self>, |
324 | | range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>), |
325 | | handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_), |
326 | 0 | ) -> Result<u64, Error> { |
327 | 0 | let mut filter = Document::new(); |
328 | | |
329 | | // Build range query |
330 | 0 | let mut key_filter = Document::new(); |
331 | 0 | match &range.0 { |
332 | 0 | Bound::Included(start) => { |
333 | 0 | let encoded = self.encode_key(start); |
334 | 0 | key_filter.insert("$gte", encoded.as_ref()); |
335 | 0 | } |
336 | 0 | Bound::Excluded(start) => { |
337 | 0 | let encoded = self.encode_key(start); |
338 | 0 | key_filter.insert("$gt", encoded.as_ref()); |
339 | 0 | } |
340 | 0 | Bound::Unbounded => {} |
341 | | } |
342 | | |
343 | 0 | match &range.1 { |
344 | 0 | Bound::Included(end) => { |
345 | 0 | let encoded = self.encode_key(end); |
346 | 0 | key_filter.insert("$lte", encoded.as_ref()); |
347 | 0 | } |
348 | 0 | Bound::Excluded(end) => { |
349 | 0 | let encoded = self.encode_key(end); |
350 | 0 | key_filter.insert("$lt", encoded.as_ref()); |
351 | 0 | } |
352 | 0 | Bound::Unbounded => {} |
353 | | } |
354 | | |
355 | 0 | if !key_filter.is_empty() { Branch (355:12): [True: 0, False: 0]
Branch (355:12): [Folded - Ignored]
|
356 | 0 | filter.insert(KEY_FIELD, key_filter); |
357 | 0 | } |
358 | | |
359 | | // Add prefix filter if needed |
360 | 0 | if !self.key_prefix.is_empty() { Branch (360:12): [True: 0, False: 0]
Branch (360:12): [Folded - Ignored]
|
361 | 0 | let regex_filter = doc! { |
362 | | KEY_FIELD: { |
363 | 0 | "$regex": format!("^{}", regex::escape(&self.key_prefix)), |
364 | | } |
365 | | }; |
366 | 0 | if filter.is_empty() { Branch (366:16): [True: 0, False: 0]
Branch (366:16): [Folded - Ignored]
|
367 | 0 | filter = regex_filter; |
368 | 0 | } else { |
369 | 0 | filter = doc! { "$and": [filter, regex_filter] }; |
370 | 0 | } |
371 | 0 | } |
372 | | |
373 | 0 | let mut cursor = self |
374 | 0 | .cas_collection |
375 | 0 | .find(filter) |
376 | 0 | .projection(doc! { KEY_FIELD: 1 }) |
377 | 0 | .await |
378 | 0 | .map_err(|e| make_err!(Code::Internal, "Failed to create cursor in list: {e}"))?; |
379 | | |
380 | 0 | let mut count = 0u64; |
381 | 0 | while let Some(doc) = cursor Branch (381:19): [True: 0, False: 0]
Branch (381:19): [Folded - Ignored]
|
382 | 0 | .try_next() |
383 | 0 | .await |
384 | 0 | .map_err(|e| make_err!(Code::Internal, "Failed to get next document in list: {e}"))? |
385 | | { |
386 | 0 | if let Ok(key) = doc.get_str(KEY_FIELD) { Branch (386:20): [True: 0, False: 0]
Branch (386:20): [Folded - Ignored]
|
387 | 0 | if let Some(decoded_key) = self.decode_key(key) { Branch (387:24): [True: 0, False: 0]
Branch (387:24): [Folded - Ignored]
|
388 | 0 | let store_key = StoreKey::new_str(&decoded_key); |
389 | 0 | count += 1; |
390 | 0 | if !handler(&store_key) { Branch (390:24): [True: 0, False: 0]
Branch (390:24): [Folded - Ignored]
|
391 | 0 | break; |
392 | 0 | } |
393 | 0 | } |
394 | 0 | } |
395 | | } |
396 | | |
397 | 0 | Ok(count) |
398 | 0 | } |
399 | | |
400 | | async fn update( |
401 | | self: Pin<&Self>, |
402 | | key: StoreKey<'_>, |
403 | | mut reader: DropCloserReadHalf, |
404 | | upload_size: UploadSizeInfo, |
405 | 0 | ) -> Result<(), Error> { |
406 | 0 | let encoded_key = self.encode_key(&key); |
407 | | |
408 | | // Handle zero digest |
409 | 0 | if is_zero_digest(key.borrow()) { Branch (409:12): [True: 0, False: 0]
Branch (409:12): [Folded - Ignored]
|
410 | 0 | let chunk = reader.peek().await.map_err(|e| { |
411 | 0 | make_err!( |
412 | 0 | Code::Internal, |
413 | | "Failed to peek in ExperimentalMongoStore::update: {e}" |
414 | | ) |
415 | 0 | })?; |
416 | 0 | if chunk.is_empty() { Branch (416:16): [True: 0, False: 0]
Branch (416:16): [Folded - Ignored]
|
417 | 0 | reader.drain().await.map_err(|e| { |
418 | 0 | make_err!( |
419 | 0 | Code::Internal, |
420 | | "Failed to drain in ExperimentalMongoStore::update: {e}" |
421 | | ) |
422 | 0 | })?; |
423 | 0 | return Ok(()); |
424 | 0 | } |
425 | 0 | } |
426 | | |
427 | | // Special handling for MaxSize(0) - this should error if stream is closed |
428 | 0 | if upload_size == UploadSizeInfo::MaxSize(0) { Branch (428:12): [True: 0, False: 0]
Branch (428:12): [Folded - Ignored]
|
429 | | // Try to read from the stream - if it's closed immediately, this should error |
430 | 0 | match reader.recv().await { |
431 | 0 | Ok(_chunk) => { |
432 | 0 | return Err(make_input_err!( |
433 | 0 | "Received data when MaxSize is 0 in MongoDB store" |
434 | 0 | )); |
435 | | } |
436 | 0 | Err(e) => { |
437 | 0 | return Err(make_err!( |
438 | 0 | Code::InvalidArgument, |
439 | 0 | "Stream closed for MaxSize(0) upload: {e}" |
440 | 0 | )); |
441 | | } |
442 | | } |
443 | 0 | } |
444 | | |
445 | | // Read all data into memory with proper EOF handling |
446 | 0 | let mut data = Vec::new(); |
447 | 0 | while let Ok(chunk) = reader.recv().await { Branch (447:19): [True: 0, False: 0]
Branch (447:19): [Folded - Ignored]
|
448 | 0 | if chunk.is_empty() { Branch (448:16): [True: 0, False: 0]
Branch (448:16): [Folded - Ignored]
|
449 | 0 | break; // Empty chunk signals EOF |
450 | 0 | } |
451 | 0 | data.extend_from_slice(&chunk); |
452 | | } |
453 | | |
454 | 0 | let size = data.len() as i64; |
455 | | |
456 | | // Create document |
457 | 0 | let doc = doc! { |
458 | 0 | KEY_FIELD: encoded_key.as_ref(), |
459 | 0 | DATA_FIELD: Bson::Binary(mongodb::bson::Binary { |
460 | 0 | subtype: mongodb::bson::spec::BinarySubtype::Generic, |
461 | 0 | bytes: data, |
462 | 0 | }), |
463 | 0 | SIZE_FIELD: size, |
464 | | }; |
465 | | |
466 | | // Upsert the document |
467 | 0 | self.cas_collection |
468 | 0 | .update_one( |
469 | 0 | doc! { KEY_FIELD: encoded_key.as_ref() }, |
470 | 0 | doc! { "$set": doc }, |
471 | 0 | ) |
472 | 0 | .upsert(true) |
473 | 0 | .await |
474 | 0 | .map_err(|e| make_err!(Code::Internal, "Failed to update document in MongoDB: {e}"))?; |
475 | | |
476 | 0 | Ok(()) |
477 | 0 | } |
478 | | |
479 | | async fn get_part( |
480 | | self: Pin<&Self>, |
481 | | key: StoreKey<'_>, |
482 | | writer: &mut DropCloserWriteHalf, |
483 | | offset: u64, |
484 | | length: Option<u64>, |
485 | 0 | ) -> Result<(), Error> { |
486 | | // Handle zero digest |
487 | 0 | if is_zero_digest(key.borrow()) { Branch (487:12): [True: 0, False: 0]
Branch (487:12): [Folded - Ignored]
|
488 | 0 | return writer.send_eof().map_err(|e| { |
489 | 0 | make_err!( |
490 | 0 | Code::Internal, |
491 | | "Failed to send zero EOF in mongo store get_part: {e}" |
492 | | ) |
493 | 0 | }); |
494 | 0 | } |
495 | | |
496 | 0 | let encoded_key = self.encode_key(&key); |
497 | 0 | let filter = doc! { KEY_FIELD: encoded_key.as_ref() }; |
498 | | |
499 | 0 | let doc = self |
500 | 0 | .cas_collection |
501 | 0 | .find_one(filter) |
502 | 0 | .await |
503 | 0 | .map_err(|e| make_err!(Code::Internal, "Failed to find document in get_part: {e}"))? |
504 | 0 | .ok_or_else(|| { |
505 | 0 | make_err!( |
506 | 0 | Code::NotFound, |
507 | | "Data not found in MongoDB store for digest: {key:?}" |
508 | | ) |
509 | 0 | })?; |
510 | | |
511 | 0 | let data = match doc.get(DATA_FIELD) { |
512 | 0 | Some(Bson::Binary(binary)) => &binary.bytes, |
513 | | _ => { |
514 | 0 | return Err(make_err!( |
515 | 0 | Code::Internal, |
516 | 0 | "Invalid data field in MongoDB document" |
517 | 0 | )); |
518 | | } |
519 | | }; |
520 | | |
521 | 0 | let offset = offset as usize; |
522 | 0 | let data_len = data.len(); |
523 | | |
524 | 0 | if offset > data_len { Branch (524:12): [True: 0, False: 0]
Branch (524:12): [Folded - Ignored]
|
525 | 0 | return writer.send_eof().map_err(|e| { |
526 | 0 | make_err!( |
527 | 0 | Code::Internal, |
528 | | "Failed to send EOF in mongo store get_part: {e}" |
529 | | ) |
530 | 0 | }); |
531 | 0 | } |
532 | | |
533 | 0 | let end = if let Some(len) = length { Branch (533:26): [True: 0, False: 0]
Branch (533:26): [Folded - Ignored]
|
534 | 0 | cmp::min(offset + len as usize, data_len) |
535 | | } else { |
536 | 0 | data_len |
537 | | }; |
538 | | |
539 | 0 | if offset < end { Branch (539:12): [True: 0, False: 0]
Branch (539:12): [Folded - Ignored]
|
540 | 0 | let chunk = &data[offset..end]; |
541 | | |
542 | | // Send data in chunks |
543 | 0 | for chunk_data in chunk.chunks(self.read_chunk_size) { |
544 | 0 | writer.send(chunk_data.to_vec().into()).await.map_err(|e| { |
545 | 0 | make_err!( |
546 | 0 | Code::Internal, |
547 | | "Failed to write data in ExperimentalMongoStore::get_part: {e}" |
548 | | ) |
549 | 0 | })?; |
550 | | } |
551 | 0 | } |
552 | | |
553 | 0 | writer.send_eof().map_err(|e| { |
554 | 0 | make_err!( |
555 | 0 | Code::Internal, |
556 | | "Failed to write EOF in mongo store get_part: {e}" |
557 | | ) |
558 | 0 | }) |
559 | 0 | } |
560 | | |
561 | 0 | fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { |
562 | 0 | self |
563 | 0 | } |
564 | | |
565 | 0 | fn as_any(&self) -> &(dyn core::any::Any + Sync + Send) { |
566 | 0 | self |
567 | 0 | } |
568 | | |
569 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send> { |
570 | 0 | self |
571 | 0 | } |
572 | | |
573 | 0 | fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) { |
574 | 0 | registry.register_indicator(self); |
575 | 0 | } |
576 | | } |
577 | | |
578 | | #[async_trait] |
579 | | impl HealthStatusIndicator for ExperimentalMongoStore { |
580 | 0 | fn get_name(&self) -> &'static str { |
581 | 0 | "ExperimentalMongoStore" |
582 | 0 | } |
583 | | |
584 | 0 | async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { |
585 | 0 | match self.database.run_command(doc! { "ping": 1 }).await { |
586 | 0 | Ok(_) => HealthStatus::new_ok(self, "Connection healthy".into()), |
587 | 0 | Err(e) => HealthStatus::new_failed( |
588 | 0 | self, |
589 | 0 | format!("{namespace} - MongoDB connection error: {e}").into(), |
590 | | ), |
591 | | } |
592 | 0 | } |
593 | | } |
594 | | |
595 | | // ------------------------------------------------------------------- |
596 | | // |
597 | | // `ExperimentalMongoDB` scheduler implementation. Likely to change |
598 | | // ------------------------------------------------------------------- |
599 | | |
600 | | /// An individual subscription to a key in `ExperimentalMongoDB`. |
601 | | #[derive(Debug)] |
602 | | pub struct ExperimentalMongoSubscription { |
603 | | receiver: Option<watch::Receiver<String>>, |
604 | | weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<ExperimentalMongoSubscriptionPublisher>>>, |
605 | | } |
606 | | |
607 | | impl SchedulerSubscription for ExperimentalMongoSubscription { |
608 | 0 | async fn changed(&mut self) -> Result<(), Error> { |
609 | 0 | let receiver = self.receiver.as_mut().ok_or_else(|| { |
610 | 0 | make_err!( |
611 | 0 | Code::Internal, |
612 | | "In ExperimentalMongoSubscription::changed::as_mut" |
613 | | ) |
614 | 0 | })?; |
615 | 0 | receiver.changed().await.map_err(|_| { |
616 | 0 | make_err!( |
617 | 0 | Code::Internal, |
618 | | "In ExperimentalMongoSubscription::changed::changed" |
619 | | ) |
620 | 0 | }) |
621 | 0 | } |
622 | | } |
623 | | |
624 | | impl Drop for ExperimentalMongoSubscription { |
625 | 0 | fn drop(&mut self) { |
626 | 0 | let Some(receiver) = self.receiver.take() else { Branch (626:13): [True: 0, False: 0]
Branch (626:13): [Folded - Ignored]
|
627 | 0 | warn!("ExperimentalMongoSubscription has already been dropped, nothing to do."); |
628 | 0 | return; |
629 | | }; |
630 | 0 | let key = receiver.borrow().clone(); |
631 | 0 | drop(receiver); |
632 | | |
633 | 0 | let Some(subscribed_keys) = self.weak_subscribed_keys.upgrade() else { Branch (633:13): [True: 0, False: 0]
Branch (633:13): [Folded - Ignored]
|
634 | 0 | return; |
635 | | }; |
636 | 0 | let mut subscribed_keys = subscribed_keys.write(); |
637 | 0 | let Some(value) = subscribed_keys.get(&key) else { Branch (637:13): [True: 0, False: 0]
Branch (637:13): [Folded - Ignored]
|
638 | 0 | error!( |
639 | 0 | "Key {key} was not found in subscribed keys when checking if it should be removed." |
640 | | ); |
641 | 0 | return; |
642 | | }; |
643 | | |
644 | 0 | if value.receiver_count() == 0 { Branch (644:12): [True: 0, False: 0]
Branch (644:12): [Folded - Ignored]
|
645 | 0 | subscribed_keys.remove(key); |
646 | 0 | } |
647 | 0 | } |
648 | | } |
649 | | |
650 | | /// A publisher for a key in `ExperimentalMongoDB`. |
651 | | #[derive(Debug)] |
652 | | struct ExperimentalMongoSubscriptionPublisher { |
653 | | sender: Mutex<watch::Sender<String>>, |
654 | | } |
655 | | |
656 | | impl ExperimentalMongoSubscriptionPublisher { |
657 | 0 | fn new( |
658 | 0 | key: String, |
659 | 0 | weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>, |
660 | 0 | ) -> (Self, ExperimentalMongoSubscription) { |
661 | 0 | let (sender, receiver) = watch::channel(key); |
662 | 0 | let publisher = Self { |
663 | 0 | sender: Mutex::new(sender), |
664 | 0 | }; |
665 | 0 | let subscription = ExperimentalMongoSubscription { |
666 | 0 | receiver: Some(receiver), |
667 | 0 | weak_subscribed_keys, |
668 | 0 | }; |
669 | 0 | (publisher, subscription) |
670 | 0 | } |
671 | | |
672 | 0 | fn subscribe( |
673 | 0 | &self, |
674 | 0 | weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>, |
675 | 0 | ) -> ExperimentalMongoSubscription { |
676 | 0 | let receiver = self.sender.lock().subscribe(); |
677 | 0 | ExperimentalMongoSubscription { |
678 | 0 | receiver: Some(receiver), |
679 | 0 | weak_subscribed_keys, |
680 | 0 | } |
681 | 0 | } |
682 | | |
683 | 0 | fn receiver_count(&self) -> usize { |
684 | 0 | self.sender.lock().receiver_count() |
685 | 0 | } |
686 | | |
687 | 0 | fn notify(&self) { |
688 | 0 | self.sender.lock().send_modify(|_| {}); |
689 | 0 | } |
690 | | } |
691 | | |
692 | | #[derive(Debug)] |
693 | | pub struct ExperimentalMongoSubscriptionManager { |
694 | | subscribed_keys: Arc<RwLock<StringPatriciaMap<ExperimentalMongoSubscriptionPublisher>>>, |
695 | | _subscription_spawn: JoinHandleDropGuard<()>, |
696 | | } |
697 | | |
698 | | impl ExperimentalMongoSubscriptionManager { |
699 | 0 | pub fn new(database: Database, collection_name: String, key_prefix: String) -> Self { |
700 | 0 | let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new())); |
701 | 0 | let subscribed_keys_weak = Arc::downgrade(&subscribed_keys); |
702 | | |
703 | | Self { |
704 | 0 | subscribed_keys, |
705 | 0 | _subscription_spawn: spawn!("mongo_subscribe_spawn", async move { |
706 | 0 | let collection = database.collection::<Document>(&collection_name); |
707 | | |
708 | | loop { |
709 | | // Try to create change stream |
710 | 0 | let change_stream_result = collection.watch().await; |
711 | | |
712 | 0 | match change_stream_result { |
713 | 0 | Ok(mut change_stream) => { |
714 | 0 | info!("MongoDB change stream connected"); |
715 | | |
716 | 0 | while let Some(event_result) = change_stream.next().await { Branch (716:39): [True: 0, False: 0]
Branch (716:39): [Folded - Ignored]
|
717 | 0 | match event_result { |
718 | 0 | Ok(event) => { |
719 | | use mongodb::change_stream::event::OperationType; |
720 | 0 | match event.operation_type { |
721 | | OperationType::Insert |
722 | | | OperationType::Update |
723 | | | OperationType::Replace |
724 | | | OperationType::Delete => { |
725 | 0 | if let Some(doc_key) = event.document_key { Branch (725:56): [True: 0, False: 0]
Branch (725:56): [Folded - Ignored]
|
726 | 0 | if let Ok(key) = doc_key.get_str(KEY_FIELD) { Branch (726:60): [True: 0, False: 0]
Branch (726:60): [Folded - Ignored]
|
727 | | // Remove prefix if present |
728 | 0 | let key = if key_prefix.is_empty() { Branch (728:70): [True: 0, False: 0]
Branch (728:70): [Folded - Ignored]
|
729 | 0 | key |
730 | | } else { |
731 | 0 | key.strip_prefix(&key_prefix) |
732 | 0 | .unwrap_or(key) |
733 | | }; |
734 | | |
735 | 0 | let Some(subscribed_keys) = Branch (735:61): [True: 0, False: 0]
Branch (735:61): [Folded - Ignored]
|
736 | 0 | subscribed_keys_weak.upgrade() |
737 | | else { |
738 | 0 | warn!( |
739 | 0 | "Parent dropped, exiting ExperimentalMongoSubscriptionManager" |
740 | | ); |
741 | 0 | return; |
742 | | }; |
743 | | |
744 | 0 | let subscribed_keys_mux = |
745 | 0 | subscribed_keys.read(); |
746 | 0 | subscribed_keys_mux |
747 | 0 | .common_prefix_values(key) |
748 | 0 | .for_each(ExperimentalMongoSubscriptionPublisher::notify); |
749 | 0 | } |
750 | 0 | } |
751 | | } |
752 | 0 | _ => {} |
753 | | } |
754 | | } |
755 | 0 | Err(e) => { |
756 | 0 | error!("Error in change stream: {e}"); |
757 | 0 | break; |
758 | | } |
759 | | } |
760 | | } |
761 | | } |
762 | 0 | Err(e) => { |
763 | 0 | warn!("Failed to create change stream: {e}. Will retry in 5 seconds."); |
764 | | } |
765 | | } |
766 | | |
767 | | // Check if parent is still alive |
768 | 0 | if subscribed_keys_weak.upgrade().is_none() { Branch (768:24): [True: 0, False: 0]
Branch (768:24): [Folded - Ignored]
|
769 | 0 | warn!("Parent dropped, exiting ExperimentalMongoSubscriptionManager"); |
770 | 0 | return; |
771 | 0 | } |
772 | | |
773 | | // Sleep before retry |
774 | 0 | sleep(Duration::from_secs(5)).await; |
775 | | |
776 | | // Notify all subscribers on reconnection |
777 | 0 | if let Some(subscribed_keys) = subscribed_keys_weak.upgrade() { Branch (777:28): [True: 0, False: 0]
Branch (777:28): [Folded - Ignored]
|
778 | 0 | let subscribed_keys_mux = subscribed_keys.read(); |
779 | 0 | for publisher in subscribed_keys_mux.values() { |
780 | 0 | publisher.notify(); |
781 | 0 | } |
782 | 0 | } |
783 | | } |
784 | 0 | }), |
785 | | } |
786 | 0 | } |
787 | | } |
788 | | |
789 | | impl SchedulerSubscriptionManager for ExperimentalMongoSubscriptionManager { |
790 | | type Subscription = ExperimentalMongoSubscription; |
791 | | |
792 | 0 | fn notify_for_test(&self, value: String) { |
793 | 0 | let subscribed_keys_mux = self.subscribed_keys.read(); |
794 | 0 | subscribed_keys_mux |
795 | 0 | .common_prefix_values(&value) |
796 | 0 | .for_each(ExperimentalMongoSubscriptionPublisher::notify); |
797 | 0 | } |
798 | | |
799 | 0 | fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error> |
800 | 0 | where |
801 | 0 | K: SchedulerStoreKeyProvider, |
802 | | { |
803 | 0 | let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys); |
804 | 0 | let mut subscribed_keys = self.subscribed_keys.write(); |
805 | 0 | let key = key.get_key(); |
806 | 0 | let key_str = key.as_str(); |
807 | | |
808 | 0 | let mut subscription = if let Some(publisher) = subscribed_keys.get(&key_str) { Branch (808:39): [Folded - Ignored]
Branch (808:39): [Folded - Ignored]
|
809 | 0 | publisher.subscribe(weak_subscribed_keys) |
810 | | } else { |
811 | 0 | let (publisher, subscription) = ExperimentalMongoSubscriptionPublisher::new( |
812 | 0 | key_str.to_string(), |
813 | 0 | weak_subscribed_keys, |
814 | 0 | ); |
815 | 0 | subscribed_keys.insert(key_str, publisher); |
816 | 0 | subscription |
817 | | }; |
818 | | |
819 | 0 | subscription |
820 | 0 | .receiver |
821 | 0 | .as_mut() |
822 | 0 | .ok_or_else(|| { |
823 | 0 | make_err!( |
824 | 0 | Code::Internal, |
825 | | "Receiver should be set in ExperimentalMongoSubscriptionManager::subscribe" |
826 | | ) |
827 | 0 | })? |
828 | 0 | .mark_changed(); |
829 | | |
830 | 0 | Ok(subscription) |
831 | 0 | } |
832 | | } |
833 | | |
834 | | impl SchedulerStore for ExperimentalMongoStore { |
835 | | type SubscriptionManager = ExperimentalMongoSubscriptionManager; |
836 | | |
837 | 0 | fn subscription_manager(&self) -> Result<Arc<ExperimentalMongoSubscriptionManager>, Error> { |
838 | 0 | let mut subscription_manager = self.subscription_manager.lock(); |
839 | 0 | if let Some(subscription_manager) = &*subscription_manager { Branch (839:16): [True: 0, False: 0]
Branch (839:16): [Folded - Ignored]
|
840 | 0 | Ok(subscription_manager.clone()) |
841 | | } else { |
842 | 0 | if !self.enable_change_streams { Branch (842:16): [True: 0, False: 0]
Branch (842:16): [Folded - Ignored]
|
843 | 0 | return Err(make_input_err!( |
844 | 0 | "ExperimentalMongoStore must have change streams enabled for scheduler subscriptions" |
845 | 0 | )); |
846 | 0 | } |
847 | | |
848 | 0 | let sub = Arc::new(ExperimentalMongoSubscriptionManager::new( |
849 | 0 | self.database.clone(), |
850 | 0 | self.scheduler_collection.name().to_string(), |
851 | 0 | self.key_prefix.clone(), |
852 | | )); |
853 | 0 | *subscription_manager = Some(sub.clone()); |
854 | 0 | Ok(sub) |
855 | | } |
856 | 0 | } |
857 | | |
858 | 0 | async fn update_data<T>(&self, data: T) -> Result<Option<u64>, Error> |
859 | 0 | where |
860 | 0 | T: SchedulerStoreDataProvider |
861 | 0 | + SchedulerStoreKeyProvider |
862 | 0 | + SchedulerCurrentVersionProvider |
863 | 0 | + Send, |
864 | 0 | { |
865 | 0 | let key = data.get_key(); |
866 | 0 | let encoded_key = self.encode_key(&key); |
867 | 0 | let maybe_index = data.get_indexes().map_err(|e| { |
868 | 0 | make_err!( |
869 | 0 | Code::Internal, |
870 | | "Error getting indexes in ExperimentalMongoStore::update_data: {e}" |
871 | | ) |
872 | 0 | })?; |
873 | | |
874 | 0 | if <T as SchedulerStoreKeyProvider>::Versioned::VALUE { Branch (874:12): [Folded - Ignored]
Branch (874:12): [True: 0, False: 0]
Branch (874:12): [Folded - Ignored]
|
875 | 0 | let current_version = data.current_version(); |
876 | 0 | let data_bytes = data.try_into_bytes().map_err(|e| { |
877 | 0 | make_err!( |
878 | 0 | Code::Internal, |
879 | | "Could not convert value to bytes in ExperimentalMongoStore::update_data: {e}" |
880 | | ) |
881 | 0 | })?; |
882 | | |
883 | 0 | let mut update_doc = doc! { |
884 | | "$set": { |
885 | 0 | DATA_FIELD: Bson::Binary(mongodb::bson::Binary { |
886 | 0 | subtype: mongodb::bson::spec::BinarySubtype::Generic, |
887 | 0 | bytes: data_bytes.to_vec(), |
888 | 0 | }), |
889 | | }, |
890 | | "$inc": { |
891 | | VERSION_FIELD: 1i64, |
892 | | } |
893 | | }; |
894 | | |
895 | | // Add indexes |
896 | 0 | for (name, value) in maybe_index { |
897 | 0 | update_doc.get_document_mut("$set").unwrap().insert( |
898 | 0 | name, |
899 | 0 | Bson::Binary(mongodb::bson::Binary { |
900 | 0 | subtype: mongodb::bson::spec::BinarySubtype::Generic, |
901 | 0 | bytes: value.to_vec(), |
902 | 0 | }), |
903 | 0 | ); |
904 | 0 | } |
905 | | |
906 | 0 | let filter = doc! { |
907 | 0 | KEY_FIELD: encoded_key.as_ref(), |
908 | 0 | VERSION_FIELD: current_version as i64, |
909 | | }; |
910 | | |
911 | 0 | match self |
912 | 0 | .scheduler_collection |
913 | 0 | .find_one_and_update(filter, update_doc) |
914 | 0 | .upsert(true) |
915 | 0 | .return_document(ReturnDocument::After) |
916 | 0 | .await |
917 | | { |
918 | 0 | Ok(Some(doc)) => { |
919 | 0 | let new_version = doc.get_i64(VERSION_FIELD).unwrap_or(1) as u64; |
920 | 0 | Ok(Some(new_version)) |
921 | | } |
922 | 0 | Ok(None) => Ok(None), |
923 | 0 | Err(e) => Err(make_err!( |
924 | 0 | Code::Internal, |
925 | 0 | "MongoDB error in update_data: {e}" |
926 | 0 | )), |
927 | | } |
928 | | } else { |
929 | 0 | let data_bytes = data.try_into_bytes().map_err(|e| { |
930 | 0 | make_err!( |
931 | 0 | Code::Internal, |
932 | | "Could not convert value to bytes in ExperimentalMongoStore::update_data: {e}" |
933 | | ) |
934 | 0 | })?; |
935 | | |
936 | 0 | let mut doc = doc! { |
937 | 0 | KEY_FIELD: encoded_key.as_ref(), |
938 | 0 | DATA_FIELD: Bson::Binary(mongodb::bson::Binary { |
939 | 0 | subtype: mongodb::bson::spec::BinarySubtype::Generic, |
940 | 0 | bytes: data_bytes.to_vec(), |
941 | 0 | }), |
942 | | }; |
943 | | |
944 | | // Add indexes |
945 | 0 | for (name, value) in maybe_index { |
946 | 0 | doc.insert( |
947 | 0 | name, |
948 | 0 | Bson::Binary(mongodb::bson::Binary { |
949 | 0 | subtype: mongodb::bson::spec::BinarySubtype::Generic, |
950 | 0 | bytes: value.to_vec(), |
951 | 0 | }), |
952 | 0 | ); |
953 | 0 | } |
954 | | |
955 | 0 | self.scheduler_collection |
956 | 0 | .update_one( |
957 | 0 | doc! { KEY_FIELD: encoded_key.as_ref() }, |
958 | 0 | doc! { "$set": doc }, |
959 | 0 | ) |
960 | 0 | .upsert(true) |
961 | 0 | .await |
962 | 0 | .map_err(|e| { |
963 | 0 | make_err!(Code::Internal, "Failed to update scheduler document: {e}") |
964 | 0 | })?; |
965 | | |
966 | 0 | Ok(Some(0)) |
967 | | } |
968 | 0 | } |
969 | | |
970 | 0 | async fn search_by_index_prefix<K>( |
971 | 0 | &self, |
972 | 0 | index: K, |
973 | 0 | ) -> Result< |
974 | 0 | impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send, |
975 | 0 | Error, |
976 | 0 | > |
977 | 0 | where |
978 | 0 | K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send, |
979 | 0 | { |
980 | 0 | let index_value = index.index_value(); |
981 | | |
982 | | // Create index if it doesn't exist |
983 | 0 | let index_name = format!("{}_{}", K::KEY_PREFIX, K::INDEX_NAME); |
984 | 0 | self.scheduler_collection |
985 | 0 | .create_index( |
986 | 0 | IndexModel::builder() |
987 | 0 | .keys(doc! { K::INDEX_NAME: 1 }) |
988 | 0 | .options(IndexOptions::builder().name(index_name).build()) |
989 | 0 | .build(), |
990 | 0 | ) |
991 | 0 | .await |
992 | 0 | .map_err(|e| make_err!(Code::Internal, "Failed to create scheduler index: {e}"))?; |
993 | | |
994 | | // Build filter |
995 | 0 | let filter = doc! { |
996 | | K::INDEX_NAME: { |
997 | 0 | "$regex": format!("^{}", regex::escape(index_value.as_ref())), |
998 | | } |
999 | | }; |
1000 | | |
1001 | | // Add sort if specified |
1002 | 0 | let find_options = if let Some(sort_key) = K::MAYBE_SORT_KEY { Branch (1002:35): [Folded - Ignored]
Branch (1002:35): [True: 0, False: 0]
Branch (1002:35): [True: 0, False: 0]
Branch (1002:35): [Folded - Ignored]
|
1003 | 0 | FindOptions::builder().sort(doc! { sort_key: 1 }).build() |
1004 | | } else { |
1005 | 0 | FindOptions::default() |
1006 | | }; |
1007 | | |
1008 | 0 | let cursor = self |
1009 | 0 | .scheduler_collection |
1010 | 0 | .find(filter) |
1011 | 0 | .with_options(find_options) |
1012 | 0 | .await |
1013 | 0 | .map_err(|e| { |
1014 | 0 | make_err!( |
1015 | 0 | Code::Internal, |
1016 | | "Failed to create cursor in search_by_index_prefix: {e}" |
1017 | | ) |
1018 | 0 | })?; |
1019 | | |
1020 | 0 | Ok(cursor.map(move |result| { |
1021 | 0 | let doc = result.map_err(|e| { |
1022 | 0 | make_err!( |
1023 | 0 | Code::Internal, |
1024 | | "Error reading document in search_by_index_prefix: {e}" |
1025 | | ) |
1026 | 0 | })?; |
1027 | | |
1028 | 0 | let data = match doc.get(DATA_FIELD) { |
1029 | 0 | Some(Bson::Binary(binary)) => Bytes::from(binary.bytes.clone()), |
1030 | | _ => { |
1031 | 0 | return Err(make_err!( |
1032 | 0 | Code::Internal, |
1033 | 0 | "Missing or invalid data field in search_by_index_prefix" |
1034 | 0 | )); |
1035 | | } |
1036 | | }; |
1037 | | |
1038 | 0 | let version = if <K as SchedulerIndexProvider>::Versioned::VALUE { Branch (1038:30): [Folded - Ignored]
Branch (1038:30): [True: 0, False: 0]
Branch (1038:30): [True: 0, False: 0]
Branch (1038:30): [Folded - Ignored]
|
1039 | 0 | doc.get_i64(VERSION_FIELD).unwrap_or(0) as u64 |
1040 | | } else { |
1041 | 0 | 0 |
1042 | | }; |
1043 | | |
1044 | 0 | K::decode(version, data).map_err(|e| { |
1045 | 0 | make_err!( |
1046 | 0 | Code::Internal, |
1047 | | "Failed to decode in search_by_index_prefix: {e}" |
1048 | | ) |
1049 | 0 | }) |
1050 | 0 | })) |
1051 | 0 | } |
1052 | | |
1053 | 0 | async fn get_and_decode<K>( |
1054 | 0 | &self, |
1055 | 0 | key: K, |
1056 | 0 | ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error> |
1057 | 0 | where |
1058 | 0 | K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send, |
1059 | 0 | { |
1060 | 0 | let key = key.get_key(); |
1061 | 0 | let encoded_key = self.encode_key(&key); |
1062 | 0 | let filter = doc! { KEY_FIELD: encoded_key.as_ref() }; |
1063 | | |
1064 | 0 | let doc = self |
1065 | 0 | .scheduler_collection |
1066 | 0 | .find_one(filter) |
1067 | 0 | .await |
1068 | 0 | .map_err(|e| { |
1069 | 0 | make_err!( |
1070 | 0 | Code::Internal, |
1071 | | "Failed to find document in get_and_decode: {e}" |
1072 | | ) |
1073 | 0 | })?; |
1074 | | |
1075 | 0 | let Some(doc) = doc else { Branch (1075:13): [Folded - Ignored]
Branch (1075:13): [True: 0, False: 0]
Branch (1075:13): [Folded - Ignored]
|
1076 | 0 | return Ok(None); |
1077 | | }; |
1078 | | |
1079 | 0 | let data = match doc.get(DATA_FIELD) { |
1080 | 0 | Some(Bson::Binary(binary)) => Bytes::from(binary.bytes.clone()), |
1081 | 0 | _ => return Ok(None), |
1082 | | }; |
1083 | | |
1084 | 0 | let version = if <K as SchedulerStoreKeyProvider>::Versioned::VALUE { Branch (1084:26): [Folded - Ignored]
Branch (1084:26): [True: 0, False: 0]
Branch (1084:26): [Folded - Ignored]
|
1085 | 0 | doc.get_i64(VERSION_FIELD).unwrap_or(0) as u64 |
1086 | | } else { |
1087 | 0 | 0 |
1088 | | }; |
1089 | | |
1090 | 0 | Ok(Some(K::decode(version, data).map_err(|e| { |
1091 | 0 | make_err!(Code::Internal, "Failed to decode in get_and_decode: {e}") |
1092 | 0 | })?)) |
1093 | 0 | } |
1094 | | } |