Coverage Report

Created: 2025-10-30 00:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/mongo_store.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::ops::Bound;
17
use core::pin::Pin;
18
use core::time::Duration;
19
use std::borrow::Cow;
20
use std::sync::{Arc, Weak};
21
22
use async_trait::async_trait;
23
use bytes::Bytes;
24
use futures::stream::{Stream, StreamExt, TryStreamExt};
25
use mongodb::bson::{Bson, Document, doc};
26
use mongodb::options::{ClientOptions, FindOptions, IndexOptions, ReturnDocument, WriteConcern};
27
use mongodb::{Client as MongoClient, Collection, Database, IndexModel};
28
use nativelink_config::stores::ExperimentalMongoSpec;
29
use nativelink_error::{Code, Error, make_err, make_input_err};
30
use nativelink_metric::MetricsComponent;
31
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
32
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
33
use nativelink_util::spawn;
34
use nativelink_util::store_trait::{
35
    BoolValue, RemoveItemCallback, SchedulerCurrentVersionProvider, SchedulerIndexProvider,
36
    SchedulerStore, SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,
37
    SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo,
38
};
39
use nativelink_util::task::JoinHandleDropGuard;
40
use parking_lot::{Mutex, RwLock};
41
use patricia_tree::StringPatriciaMap;
42
use tokio::sync::watch;
43
use tokio::time::sleep;
44
use tracing::{error, info, warn};
45
46
use crate::cas_utils::is_zero_digest;
47
48
/// The default database name if not specified.
49
const DEFAULT_DB_NAME: &str = "nativelink";
50
51
/// The default collection name if not specified.
52
const DEFAULT_COLLECTION_NAME: &str = "cas";
53
54
/// The default scheduler collection name if not specified.
55
const DEFAULT_SCHEDULER_COLLECTION_NAME: &str = "scheduler";
56
57
/// The default size of the read chunk when reading data from `MongoDB`.
58
const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024;
59
60
/// The default connection timeout in milliseconds if not specified.
61
const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000;
62
63
/// The default command timeout in milliseconds if not specified.
64
const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000;
65
66
/// The default maximum number of concurrent uploads.
67
const DEFAULT_MAX_CONCURRENT_UPLOADS: usize = 10;
68
69
/// The name of the field in `MongoDB` documents that stores the key.
70
const KEY_FIELD: &str = "_id";
71
72
/// The name of the field in `MongoDB` documents that stores the data.
73
const DATA_FIELD: &str = "data";
74
75
/// The name of the field in `MongoDB` documents that stores the version.
76
const VERSION_FIELD: &str = "version";
77
78
/// The name of the field in `MongoDB` documents that stores the size.
79
const SIZE_FIELD: &str = "size";
80
81
/// A [`StoreDriver`] implementation that uses `MongoDB` as a backing store.
82
#[derive(Debug, MetricsComponent)]
83
pub struct ExperimentalMongoStore {
84
    /// The `MongoDB` client.
85
    #[allow(dead_code)]
86
    client: MongoClient,
87
88
    /// The database to use.
89
    database: Database,
90
91
    /// The collection for CAS data.
92
    cas_collection: Collection<Document>,
93
94
    /// The collection for scheduler data.
95
    scheduler_collection: Collection<Document>,
96
97
    /// A common prefix to append to all keys before they are sent to `MongoDB`.
98
    #[metric(help = "Prefix to append to all keys before sending to MongoDB")]
99
    key_prefix: String,
100
101
    /// The amount of data to read from `MongoDB` at a time.
102
    #[metric(help = "The amount of data to read from MongoDB at a time")]
103
    read_chunk_size: usize,
104
105
    /// The maximum number of concurrent uploads.
106
    #[metric(help = "The maximum number of concurrent uploads")]
107
    max_concurrent_uploads: usize,
108
109
    /// Enable change streams for real-time updates.
110
    #[metric(help = "Whether change streams are enabled")]
111
    enable_change_streams: bool,
112
113
    /// A manager for subscriptions to keys in `MongoDB`.
114
    subscription_manager: Mutex<Option<Arc<ExperimentalMongoSubscriptionManager>>>,
115
}
116
117
impl ExperimentalMongoStore {
118
    /// Create a new `ExperimentalMongoStore` from the given configuration.
119
3
    pub async fn new(mut spec: ExperimentalMongoSpec) -> Result<Arc<Self>, Error> {
120
        // Set defaults
121
3
        if spec.connection_string.is_empty() {
  Branch (121:12): [True: 0, False: 0]
  Branch (121:12): [True: 1, False: 2]
  Branch (121:12): [Folded - Ignored]
122
1
            return Err(make_err!(
123
1
                Code::InvalidArgument,
124
1
                "No connection string was specified in mongo store configuration."
125
1
            ));
126
2
        }
127
128
2
        if spec.database.is_empty() {
  Branch (128:12): [True: 0, False: 0]
  Branch (128:12): [True: 0, False: 2]
  Branch (128:12): [Folded - Ignored]
129
0
            spec.database = DEFAULT_DB_NAME.to_string();
130
2
        }
131
132
2
        if spec.cas_collection.is_empty() {
  Branch (132:12): [True: 0, False: 0]
  Branch (132:12): [True: 0, False: 2]
  Branch (132:12): [Folded - Ignored]
133
0
            spec.cas_collection = DEFAULT_COLLECTION_NAME.to_string();
134
2
        }
135
136
2
        if spec.scheduler_collection.is_empty() {
  Branch (136:12): [True: 0, False: 0]
  Branch (136:12): [True: 0, False: 2]
  Branch (136:12): [Folded - Ignored]
137
0
            spec.scheduler_collection = DEFAULT_SCHEDULER_COLLECTION_NAME.to_string();
138
2
        }
139
140
2
        if spec.read_chunk_size == 0 {
  Branch (140:12): [True: 0, False: 0]
  Branch (140:12): [True: 0, False: 2]
  Branch (140:12): [Folded - Ignored]
141
0
            spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
142
2
        }
143
144
2
        if spec.connection_timeout_ms == 0 {
  Branch (144:12): [True: 0, False: 0]
  Branch (144:12): [True: 0, False: 2]
  Branch (144:12): [Folded - Ignored]
145
0
            spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS;
146
2
        }
147
148
2
        if spec.command_timeout_ms == 0 {
  Branch (148:12): [True: 0, False: 0]
  Branch (148:12): [True: 0, False: 2]
  Branch (148:12): [Folded - Ignored]
149
0
            spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS;
150
2
        }
151
152
2
        if spec.max_concurrent_uploads == 0 {
  Branch (152:12): [True: 0, False: 0]
  Branch (152:12): [True: 0, False: 2]
  Branch (152:12): [Folded - Ignored]
153
0
            spec.max_concurrent_uploads = DEFAULT_MAX_CONCURRENT_UPLOADS;
154
2
        }
155
156
        // Configure client options
157
2
        let 
mut client_options1
= ClientOptions::parse(&spec.connection_string)
158
2
            .await
159
2
            .map_err(|e| 
{1
160
1
                make_err!(
161
1
                    Code::InvalidArgument,
162
                    "Failed to parse MongoDB connection string: {e}"
163
                )
164
1
            })?;
165
166
1
        client_options.server_selection_timeout =
167
1
            Some(Duration::from_millis(spec.connection_timeout_ms));
168
1
        client_options.connect_timeout = Some(Duration::from_millis(spec.connection_timeout_ms));
169
170
        // Set write concern if specified
171
1
        if let Some(
w0
) = spec.write_concern_w {
  Branch (171:16): [True: 0, False: 0]
  Branch (171:16): [True: 0, False: 1]
  Branch (171:16): [Folded - Ignored]
172
            // Parse write concern w - can be a number or string like "majority"
173
0
            let w_value = if let Ok(num) = w.parse::<u32>() {
  Branch (173:34): [True: 0, False: 0]
  Branch (173:34): [True: 0, False: 0]
  Branch (173:34): [Folded - Ignored]
174
0
                Some(num.into())
175
            } else {
176
0
                Some(w.into())
177
            };
178
179
            // Build write concern based on which options are set
180
0
            let write_concern = match (w_value, spec.write_concern_j, spec.write_concern_timeout_ms)
181
            {
182
0
                (Some(w), Some(j), Some(timeout)) => WriteConcern::builder()
183
0
                    .w(Some(w))
184
0
                    .journal(j)
185
0
                    .w_timeout(Some(Duration::from_millis(u64::from(timeout))))
186
0
                    .build(),
187
0
                (Some(w), Some(j), None) => WriteConcern::builder().w(Some(w)).journal(j).build(),
188
0
                (Some(w), None, Some(timeout)) => WriteConcern::builder()
189
0
                    .w(Some(w))
190
0
                    .w_timeout(Some(Duration::from_millis(u64::from(timeout))))
191
0
                    .build(),
192
0
                (Some(w), None, None) => WriteConcern::builder().w(Some(w)).build(),
193
0
                _ => unreachable!(), // We know w is Some because we're in the if let Some(w) block
194
            };
195
196
0
            client_options.write_concern = Some(write_concern);
197
1
        } else if spec.write_concern_j.is_some() || 
spec.write_concern_timeout_ms0
.
is_some0
() {
  Branch (197:19): [True: 0, False: 0]
  Branch (197:53): [True: 0, False: 0]
  Branch (197:19): [True: 1, False: 0]
  Branch (197:53): [True: 0, False: 0]
  Branch (197:19): [Folded - Ignored]
  Branch (197:53): [Folded - Ignored]
198
1
            return Err(make_err!(
199
1
                Code::InvalidArgument,
200
1
                "write_concern_w not set, but j and/or timeout set. Please set 'write_concern_w' to a non-default value. See https://www.mongodb.com/docs/manual/reference/write-concern/#w-option for options."
201
1
            ));
202
0
        }
203
204
        // Create client
205
0
        let client = MongoClient::with_options(client_options).map_err(|e| {
206
0
            make_err!(
207
0
                Code::InvalidArgument,
208
                "Failed to create MongoDB client: {e}"
209
            )
210
0
        })?;
211
212
        // Get database and collections
213
0
        let database = client.database(&spec.database);
214
0
        let cas_collection = database.collection::<Document>(&spec.cas_collection);
215
0
        let scheduler_collection = database.collection::<Document>(&spec.scheduler_collection);
216
217
        // Create indexes
218
0
        Self::create_indexes(&cas_collection, &scheduler_collection).await?;
219
220
0
        let store = Self {
221
0
            client,
222
0
            database,
223
0
            cas_collection,
224
0
            scheduler_collection,
225
0
            key_prefix: spec.key_prefix.clone().unwrap_or_default(),
226
0
            read_chunk_size: spec.read_chunk_size,
227
0
            max_concurrent_uploads: spec.max_concurrent_uploads,
228
0
            enable_change_streams: spec.enable_change_streams,
229
0
            subscription_manager: Mutex::new(None),
230
0
        };
231
232
0
        Ok(Arc::new(store))
233
3
    }
234
235
    /// Create necessary indexes for efficient operations.
236
0
    async fn create_indexes(
237
0
        cas_collection: &Collection<Document>,
238
0
        _scheduler_collection: &Collection<Document>,
239
0
    ) -> Result<(), Error> {
240
        // CAS collection indexes
241
0
        cas_collection
242
0
            .create_index(IndexModel::builder().keys(doc! { SIZE_FIELD: 1 }).build())
243
0
            .await
244
0
            .map_err(|e| {
245
0
                make_err!(
246
0
                    Code::Internal,
247
                    "Failed to create size index on CAS collection: {e}"
248
                )
249
0
            })?;
250
251
        // Scheduler collection will have dynamic indexes created as needed
252
253
0
        Ok(())
254
0
    }
255
256
    /// Encode a [`StoreKey`] so it can be sent to `MongoDB`.
257
0
    fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> {
258
0
        let key_body = key.as_str();
259
0
        if self.key_prefix.is_empty() {
  Branch (259:12): [True: 0, False: 0]
  Branch (259:12): [Folded - Ignored]
260
0
            key_body
261
        } else {
262
0
            match key_body {
263
0
                Cow::Owned(mut encoded_key) => {
264
0
                    encoded_key.insert_str(0, &self.key_prefix);
265
0
                    Cow::Owned(encoded_key)
266
                }
267
0
                Cow::Borrowed(body) => {
268
0
                    let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len());
269
0
                    encoded_key.push_str(&self.key_prefix);
270
0
                    encoded_key.push_str(body);
271
0
                    Cow::Owned(encoded_key)
272
                }
273
            }
274
        }
275
0
    }
276
277
    /// Decode a key from `MongoDB` by removing the prefix.
278
0
    fn decode_key(&self, key: &str) -> Option<String> {
279
0
        if self.key_prefix.is_empty() {
  Branch (279:12): [True: 0, False: 0]
  Branch (279:12): [Folded - Ignored]
280
0
            Some(key.to_string())
281
        } else {
282
0
            key.strip_prefix(&self.key_prefix).map(ToString::to_string)
283
        }
284
0
    }
285
}
286
287
#[async_trait]
288
impl StoreDriver for ExperimentalMongoStore {
289
    async fn has_with_results(
290
        self: Pin<&Self>,
291
        keys: &[StoreKey<'_>],
292
        results: &mut [Option<u64>],
293
0
    ) -> Result<(), Error> {
294
        for (key, result) in keys.iter().zip(results.iter_mut()) {
295
            // Handle zero digest specially
296
            if is_zero_digest(key.borrow()) {
297
                *result = Some(0);
298
                continue;
299
            }
300
301
            let encoded_key = self.encode_key(key);
302
            let filter = doc! { KEY_FIELD: encoded_key.as_ref() };
303
304
            match self.cas_collection.find_one(filter).await {
305
                Ok(Some(doc)) => {
306
0
                    *result = doc.get_i64(SIZE_FIELD).ok().map(|v| v as u64);
307
                }
308
                Ok(None) => {
309
                    *result = None;
310
                }
311
                Err(e) => {
312
                    return Err(make_err!(
313
                        Code::Internal,
314
                        "MongoDB error in has_with_results: {e}"
315
                    ));
316
                }
317
            }
318
        }
319
        Ok(())
320
0
    }
321
322
    async fn list(
323
        self: Pin<&Self>,
324
        range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
325
        handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
326
0
    ) -> Result<u64, Error> {
327
        let mut filter = Document::new();
328
329
        // Build range query
330
        let mut key_filter = Document::new();
331
        match &range.0 {
332
            Bound::Included(start) => {
333
                let encoded = self.encode_key(start);
334
                key_filter.insert("$gte", encoded.as_ref());
335
            }
336
            Bound::Excluded(start) => {
337
                let encoded = self.encode_key(start);
338
                key_filter.insert("$gt", encoded.as_ref());
339
            }
340
            Bound::Unbounded => {}
341
        }
342
343
        match &range.1 {
344
            Bound::Included(end) => {
345
                let encoded = self.encode_key(end);
346
                key_filter.insert("$lte", encoded.as_ref());
347
            }
348
            Bound::Excluded(end) => {
349
                let encoded = self.encode_key(end);
350
                key_filter.insert("$lt", encoded.as_ref());
351
            }
352
            Bound::Unbounded => {}
353
        }
354
355
        if !key_filter.is_empty() {
356
            filter.insert(KEY_FIELD, key_filter);
357
        }
358
359
        // Add prefix filter if needed
360
        if !self.key_prefix.is_empty() {
361
            let regex_filter = doc! {
362
                KEY_FIELD: {
363
                    "$regex": format!("^{}", regex::escape(&self.key_prefix)),
364
                }
365
            };
366
            if filter.is_empty() {
367
                filter = regex_filter;
368
            } else {
369
                filter = doc! { "$and": [filter, regex_filter] };
370
            }
371
        }
372
373
        let mut cursor = self
374
            .cas_collection
375
            .find(filter)
376
            .projection(doc! { KEY_FIELD: 1 })
377
            .await
378
0
            .map_err(|e| make_err!(Code::Internal, "Failed to create cursor in list: {e}"))?;
379
380
        let mut count = 0u64;
381
        while let Some(doc) = cursor
382
            .try_next()
383
            .await
384
0
            .map_err(|e| make_err!(Code::Internal, "Failed to get next document in list: {e}"))?
385
        {
386
            if let Ok(key) = doc.get_str(KEY_FIELD) {
387
                if let Some(decoded_key) = self.decode_key(key) {
388
                    let store_key = StoreKey::new_str(&decoded_key);
389
                    count += 1;
390
                    if !handler(&store_key) {
391
                        break;
392
                    }
393
                }
394
            }
395
        }
396
397
        Ok(count)
398
0
    }
399
400
    async fn update(
401
        self: Pin<&Self>,
402
        key: StoreKey<'_>,
403
        mut reader: DropCloserReadHalf,
404
        upload_size: UploadSizeInfo,
405
0
    ) -> Result<(), Error> {
406
        let encoded_key = self.encode_key(&key);
407
408
        // Handle zero digest
409
        if is_zero_digest(key.borrow()) {
410
0
            let chunk = reader.peek().await.map_err(|e| {
411
0
                make_err!(
412
0
                    Code::Internal,
413
                    "Failed to peek in ExperimentalMongoStore::update: {e}"
414
                )
415
0
            })?;
416
            if chunk.is_empty() {
417
0
                reader.drain().await.map_err(|e| {
418
0
                    make_err!(
419
0
                        Code::Internal,
420
                        "Failed to drain in ExperimentalMongoStore::update: {e}"
421
                    )
422
0
                })?;
423
                return Ok(());
424
            }
425
        }
426
427
        // Special handling for MaxSize(0) - this should error if stream is closed
428
        if upload_size == UploadSizeInfo::MaxSize(0) {
429
            // Try to read from the stream - if it's closed immediately, this should error
430
            match reader.recv().await {
431
                Ok(_chunk) => {
432
                    return Err(make_input_err!(
433
                        "Received data when MaxSize is 0 in MongoDB store"
434
                    ));
435
                }
436
                Err(e) => {
437
                    return Err(make_err!(
438
                        Code::InvalidArgument,
439
                        "Stream closed for MaxSize(0) upload: {e}"
440
                    ));
441
                }
442
            }
443
        }
444
445
        // Read all data into memory with proper EOF handling
446
        let mut data = Vec::new();
447
        while let Ok(chunk) = reader.recv().await {
448
            if chunk.is_empty() {
449
                break; // Empty chunk signals EOF
450
            }
451
            data.extend_from_slice(&chunk);
452
        }
453
454
        let size = data.len() as i64;
455
456
        // Create document
457
        let doc = doc! {
458
            KEY_FIELD: encoded_key.as_ref(),
459
            DATA_FIELD: Bson::Binary(mongodb::bson::Binary {
460
                subtype: mongodb::bson::spec::BinarySubtype::Generic,
461
                bytes: data,
462
            }),
463
            SIZE_FIELD: size,
464
        };
465
466
        // Upsert the document
467
        self.cas_collection
468
            .update_one(
469
                doc! { KEY_FIELD: encoded_key.as_ref() },
470
                doc! { "$set": doc },
471
            )
472
            .upsert(true)
473
            .await
474
0
            .map_err(|e| make_err!(Code::Internal, "Failed to update document in MongoDB: {e}"))?;
475
476
        Ok(())
477
0
    }
478
479
    async fn get_part(
480
        self: Pin<&Self>,
481
        key: StoreKey<'_>,
482
        writer: &mut DropCloserWriteHalf,
483
        offset: u64,
484
        length: Option<u64>,
485
0
    ) -> Result<(), Error> {
486
        // Handle zero digest
487
        if is_zero_digest(key.borrow()) {
488
0
            return writer.send_eof().map_err(|e| {
489
0
                make_err!(
490
0
                    Code::Internal,
491
                    "Failed to send zero EOF in mongo store get_part: {e}"
492
                )
493
0
            });
494
        }
495
496
        let encoded_key = self.encode_key(&key);
497
        let filter = doc! { KEY_FIELD: encoded_key.as_ref() };
498
499
        let doc = self
500
            .cas_collection
501
            .find_one(filter)
502
            .await
503
0
            .map_err(|e| make_err!(Code::Internal, "Failed to find document in get_part: {e}"))?
504
0
            .ok_or_else(|| {
505
0
                make_err!(
506
0
                    Code::NotFound,
507
                    "Data not found in MongoDB store for digest: {key:?}"
508
                )
509
0
            })?;
510
511
        let data = match doc.get(DATA_FIELD) {
512
            Some(Bson::Binary(binary)) => &binary.bytes,
513
            _ => {
514
                return Err(make_err!(
515
                    Code::Internal,
516
                    "Invalid data field in MongoDB document"
517
                ));
518
            }
519
        };
520
521
        let offset = usize::try_from(offset).unwrap_or(usize::MAX);
522
        let data_len = data.len();
523
524
        if offset > data_len {
525
0
            return writer.send_eof().map_err(|e| {
526
0
                make_err!(
527
0
                    Code::Internal,
528
                    "Failed to send EOF in mongo store get_part: {e}"
529
                )
530
0
            });
531
        }
532
533
        let end = if let Some(len) = length {
534
            cmp::min(
535
                offset.saturating_add(usize::try_from(len).unwrap_or(usize::MAX)),
536
                data_len,
537
            )
538
        } else {
539
            data_len
540
        };
541
542
        if offset < end {
543
            let chunk = &data[offset..end];
544
545
            // Send data in chunks
546
            for chunk_data in chunk.chunks(self.read_chunk_size) {
547
0
                writer.send(chunk_data.to_vec().into()).await.map_err(|e| {
548
0
                    make_err!(
549
0
                        Code::Internal,
550
                        "Failed to write data in ExperimentalMongoStore::get_part: {e}"
551
                    )
552
0
                })?;
553
            }
554
        }
555
556
0
        writer.send_eof().map_err(|e| {
557
0
            make_err!(
558
0
                Code::Internal,
559
                "Failed to write EOF in mongo store get_part: {e}"
560
            )
561
0
        })
562
0
    }
563
564
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
565
0
        self
566
0
    }
567
568
0
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send) {
569
0
        self
570
0
    }
571
572
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send> {
573
0
        self
574
0
    }
575
576
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
577
0
        registry.register_indicator(self);
578
0
    }
579
580
0
    fn register_remove_callback(
581
0
        self: Arc<Self>,
582
0
        _callback: Arc<dyn RemoveItemCallback>,
583
0
    ) -> Result<(), Error> {
584
        // drop because we don't remove anything from Mongo
585
0
        Ok(())
586
0
    }
587
}
588
589
#[async_trait]
590
impl HealthStatusIndicator for ExperimentalMongoStore {
591
0
    fn get_name(&self) -> &'static str {
592
0
        "ExperimentalMongoStore"
593
0
    }
594
595
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
596
        match self.database.run_command(doc! { "ping": 1 }).await {
597
            Ok(_) => HealthStatus::new_ok(self, "Connection healthy".into()),
598
            Err(e) => HealthStatus::new_failed(
599
                self,
600
                format!("{namespace} - MongoDB connection error: {e}").into(),
601
            ),
602
        }
603
0
    }
604
}
605
606
// -------------------------------------------------------------------
607
//
608
// `ExperimentalMongoDB` scheduler implementation. Likely to change
609
// -------------------------------------------------------------------
610
611
/// An individual subscription to a key in `ExperimentalMongoDB`.
612
#[derive(Debug)]
613
pub struct ExperimentalMongoSubscription {
614
    receiver: Option<watch::Receiver<String>>,
615
    weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<ExperimentalMongoSubscriptionPublisher>>>,
616
}
617
618
impl SchedulerSubscription for ExperimentalMongoSubscription {
619
0
    async fn changed(&mut self) -> Result<(), Error> {
620
0
        let receiver = self.receiver.as_mut().ok_or_else(|| {
621
0
            make_err!(
622
0
                Code::Internal,
623
                "In ExperimentalMongoSubscription::changed::as_mut"
624
            )
625
0
        })?;
626
0
        receiver.changed().await.map_err(|_| {
627
0
            make_err!(
628
0
                Code::Internal,
629
                "In ExperimentalMongoSubscription::changed::changed"
630
            )
631
0
        })
632
0
    }
633
}
634
635
impl Drop for ExperimentalMongoSubscription {
636
0
    fn drop(&mut self) {
637
0
        let Some(receiver) = self.receiver.take() else {
  Branch (637:13): [True: 0, False: 0]
  Branch (637:13): [Folded - Ignored]
638
0
            warn!("ExperimentalMongoSubscription has already been dropped, nothing to do.");
639
0
            return;
640
        };
641
0
        let key = receiver.borrow().clone();
642
0
        drop(receiver);
643
644
0
        let Some(subscribed_keys) = self.weak_subscribed_keys.upgrade() else {
  Branch (644:13): [True: 0, False: 0]
  Branch (644:13): [Folded - Ignored]
645
0
            return;
646
        };
647
0
        let mut subscribed_keys = subscribed_keys.write();
648
0
        let Some(value) = subscribed_keys.get(&key) else {
  Branch (648:13): [True: 0, False: 0]
  Branch (648:13): [Folded - Ignored]
649
0
            error!(
650
0
                "Key {key} was not found in subscribed keys when checking if it should be removed."
651
            );
652
0
            return;
653
        };
654
655
0
        if value.receiver_count() == 0 {
  Branch (655:12): [True: 0, False: 0]
  Branch (655:12): [Folded - Ignored]
656
0
            subscribed_keys.remove(key);
657
0
        }
658
0
    }
659
}
660
661
/// A publisher for a key in `ExperimentalMongoDB`.
662
#[derive(Debug)]
663
struct ExperimentalMongoSubscriptionPublisher {
664
    sender: Mutex<watch::Sender<String>>,
665
}
666
667
impl ExperimentalMongoSubscriptionPublisher {
668
0
    fn new(
669
0
        key: String,
670
0
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
671
0
    ) -> (Self, ExperimentalMongoSubscription) {
672
0
        let (sender, receiver) = watch::channel(key);
673
0
        let publisher = Self {
674
0
            sender: Mutex::new(sender),
675
0
        };
676
0
        let subscription = ExperimentalMongoSubscription {
677
0
            receiver: Some(receiver),
678
0
            weak_subscribed_keys,
679
0
        };
680
0
        (publisher, subscription)
681
0
    }
682
683
0
    fn subscribe(
684
0
        &self,
685
0
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
686
0
    ) -> ExperimentalMongoSubscription {
687
0
        let receiver = self.sender.lock().subscribe();
688
0
        ExperimentalMongoSubscription {
689
0
            receiver: Some(receiver),
690
0
            weak_subscribed_keys,
691
0
        }
692
0
    }
693
694
0
    fn receiver_count(&self) -> usize {
695
0
        self.sender.lock().receiver_count()
696
0
    }
697
698
0
    fn notify(&self) {
699
0
        self.sender.lock().send_modify(|_| {});
700
0
    }
701
}
702
703
#[derive(Debug)]
704
pub struct ExperimentalMongoSubscriptionManager {
705
    subscribed_keys: Arc<RwLock<StringPatriciaMap<ExperimentalMongoSubscriptionPublisher>>>,
706
    _subscription_spawn: JoinHandleDropGuard<()>,
707
}
708
709
impl ExperimentalMongoSubscriptionManager {
710
0
    pub fn new(database: Database, collection_name: String, key_prefix: String) -> Self {
711
0
        let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new()));
712
0
        let subscribed_keys_weak = Arc::downgrade(&subscribed_keys);
713
714
        Self {
715
0
            subscribed_keys,
716
0
            _subscription_spawn: spawn!("mongo_subscribe_spawn", async move {
717
0
                let collection = database.collection::<Document>(&collection_name);
718
719
                loop {
720
                    // Try to create change stream
721
0
                    let change_stream_result = collection.watch().await;
722
723
0
                    match change_stream_result {
724
0
                        Ok(mut change_stream) => {
725
0
                            info!("MongoDB change stream connected");
726
727
0
                            while let Some(event_result) = change_stream.next().await {
  Branch (727:39): [True: 0, False: 0]
  Branch (727:39): [Folded - Ignored]
728
0
                                match event_result {
729
0
                                    Ok(event) => {
730
                                        use mongodb::change_stream::event::OperationType;
731
0
                                        match event.operation_type {
732
                                            OperationType::Insert
733
                                            | OperationType::Update
734
                                            | OperationType::Replace
735
                                            | OperationType::Delete => {
736
0
                                                if let Some(doc_key) = event.document_key {
  Branch (736:56): [True: 0, False: 0]
  Branch (736:56): [Folded - Ignored]
737
0
                                                    if let Ok(key) = doc_key.get_str(KEY_FIELD) {
  Branch (737:60): [True: 0, False: 0]
  Branch (737:60): [Folded - Ignored]
738
                                                        // Remove prefix if present
739
0
                                                        let key = if key_prefix.is_empty() {
  Branch (739:70): [True: 0, False: 0]
  Branch (739:70): [Folded - Ignored]
740
0
                                                            key
741
                                                        } else {
742
0
                                                            key.strip_prefix(&key_prefix)
743
0
                                                                .unwrap_or(key)
744
                                                        };
745
746
0
                                                        let Some(subscribed_keys) =
  Branch (746:61): [True: 0, False: 0]
  Branch (746:61): [Folded - Ignored]
747
0
                                                            subscribed_keys_weak.upgrade()
748
                                                        else {
749
0
                                                            warn!(
750
0
                                                                "Parent dropped, exiting ExperimentalMongoSubscriptionManager"
751
                                                            );
752
0
                                                            return;
753
                                                        };
754
755
0
                                                        let subscribed_keys_mux =
756
0
                                                            subscribed_keys.read();
757
0
                                                        subscribed_keys_mux
758
0
                                                                .common_prefix_values(key)
759
0
                                                                .for_each(ExperimentalMongoSubscriptionPublisher::notify);
760
0
                                                    }
761
0
                                                }
762
                                            }
763
0
                                            _ => {}
764
                                        }
765
                                    }
766
0
                                    Err(e) => {
767
0
                                        error!("Error in change stream: {e}");
768
0
                                        break;
769
                                    }
770
                                }
771
                            }
772
                        }
773
0
                        Err(e) => {
774
0
                            warn!("Failed to create change stream: {e}. Will retry in 5 seconds.");
775
                        }
776
                    }
777
778
                    // Check if parent is still alive
779
0
                    if subscribed_keys_weak.upgrade().is_none() {
  Branch (779:24): [True: 0, False: 0]
  Branch (779:24): [Folded - Ignored]
780
0
                        warn!("Parent dropped, exiting ExperimentalMongoSubscriptionManager");
781
0
                        return;
782
0
                    }
783
784
                    // Sleep before retry
785
0
                    sleep(Duration::from_secs(5)).await;
786
787
                    // Notify all subscribers on reconnection
788
0
                    if let Some(subscribed_keys) = subscribed_keys_weak.upgrade() {
  Branch (788:28): [True: 0, False: 0]
  Branch (788:28): [Folded - Ignored]
789
0
                        let subscribed_keys_mux = subscribed_keys.read();
790
0
                        for publisher in subscribed_keys_mux.values() {
791
0
                            publisher.notify();
792
0
                        }
793
0
                    }
794
                }
795
0
            }),
796
        }
797
0
    }
798
}
799
800
impl SchedulerSubscriptionManager for ExperimentalMongoSubscriptionManager {
801
    type Subscription = ExperimentalMongoSubscription;
802
803
0
    fn notify_for_test(&self, value: String) {
804
0
        let subscribed_keys_mux = self.subscribed_keys.read();
805
0
        subscribed_keys_mux
806
0
            .common_prefix_values(&value)
807
0
            .for_each(ExperimentalMongoSubscriptionPublisher::notify);
808
0
    }
809
810
0
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
811
0
    where
812
0
        K: SchedulerStoreKeyProvider,
813
    {
814
0
        let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys);
815
0
        let mut subscribed_keys = self.subscribed_keys.write();
816
0
        let key = key.get_key();
817
0
        let key_str = key.as_str();
818
819
0
        let mut subscription = if let Some(publisher) = subscribed_keys.get(&key_str) {
  Branch (819:39): [Folded - Ignored]
  Branch (819:39): [Folded - Ignored]
820
0
            publisher.subscribe(weak_subscribed_keys)
821
        } else {
822
0
            let (publisher, subscription) = ExperimentalMongoSubscriptionPublisher::new(
823
0
                key_str.to_string(),
824
0
                weak_subscribed_keys,
825
0
            );
826
0
            subscribed_keys.insert(key_str, publisher);
827
0
            subscription
828
        };
829
830
0
        subscription
831
0
            .receiver
832
0
            .as_mut()
833
0
            .ok_or_else(|| {
834
0
                make_err!(
835
0
                    Code::Internal,
836
                    "Receiver should be set in ExperimentalMongoSubscriptionManager::subscribe"
837
                )
838
0
            })?
839
0
            .mark_changed();
840
841
0
        Ok(subscription)
842
0
    }
843
844
0
    fn is_reliable() -> bool {
845
0
        true
846
0
    }
847
}
848
849
impl SchedulerStore for ExperimentalMongoStore {
850
    type SubscriptionManager = ExperimentalMongoSubscriptionManager;
851
852
0
    fn subscription_manager(&self) -> Result<Arc<ExperimentalMongoSubscriptionManager>, Error> {
853
0
        let mut subscription_manager = self.subscription_manager.lock();
854
0
        if let Some(subscription_manager) = &*subscription_manager {
  Branch (854:16): [True: 0, False: 0]
  Branch (854:16): [Folded - Ignored]
855
0
            Ok(subscription_manager.clone())
856
        } else {
857
0
            if !self.enable_change_streams {
  Branch (857:16): [True: 0, False: 0]
  Branch (857:16): [Folded - Ignored]
858
0
                return Err(make_input_err!(
859
0
                    "ExperimentalMongoStore must have change streams enabled for scheduler subscriptions"
860
0
                ));
861
0
            }
862
863
0
            let sub = Arc::new(ExperimentalMongoSubscriptionManager::new(
864
0
                self.database.clone(),
865
0
                self.scheduler_collection.name().to_string(),
866
0
                self.key_prefix.clone(),
867
            ));
868
0
            *subscription_manager = Some(sub.clone());
869
0
            Ok(sub)
870
        }
871
0
    }
872
873
0
    async fn update_data<T>(&self, data: T) -> Result<Option<i64>, Error>
874
0
    where
875
0
        T: SchedulerStoreDataProvider
876
0
            + SchedulerStoreKeyProvider
877
0
            + SchedulerCurrentVersionProvider
878
0
            + Send,
879
0
    {
880
0
        let key = data.get_key();
881
0
        let encoded_key = self.encode_key(&key);
882
0
        let maybe_index = data.get_indexes().map_err(|e| {
883
0
            make_err!(
884
0
                Code::Internal,
885
                "Error getting indexes in ExperimentalMongoStore::update_data: {e}"
886
            )
887
0
        })?;
888
889
0
        if <T as SchedulerStoreKeyProvider>::Versioned::VALUE {
  Branch (889:12): [Folded - Ignored]
  Branch (889:12): [True: 0, False: 0]
  Branch (889:12): [Folded - Ignored]
890
0
            let current_version = data.current_version();
891
0
            let data_bytes = data.try_into_bytes().map_err(|e| {
892
0
                make_err!(
893
0
                    Code::Internal,
894
                    "Could not convert value to bytes in ExperimentalMongoStore::update_data: {e}"
895
                )
896
0
            })?;
897
898
0
            let mut update_doc = doc! {
899
                "$set": {
900
0
                    DATA_FIELD: Bson::Binary(mongodb::bson::Binary {
901
0
                        subtype: mongodb::bson::spec::BinarySubtype::Generic,
902
0
                        bytes: data_bytes.to_vec(),
903
0
                    }),
904
                },
905
                "$inc": {
906
                    VERSION_FIELD: 1i64,
907
                }
908
            };
909
910
            // Add indexes
911
0
            for (name, value) in maybe_index {
912
0
                update_doc.get_document_mut("$set").unwrap().insert(
913
0
                    name,
914
0
                    Bson::Binary(mongodb::bson::Binary {
915
0
                        subtype: mongodb::bson::spec::BinarySubtype::Generic,
916
0
                        bytes: value.to_vec(),
917
0
                    }),
918
0
                );
919
0
            }
920
921
0
            let filter = doc! {
922
0
                KEY_FIELD: encoded_key.as_ref(),
923
0
                VERSION_FIELD: current_version,
924
            };
925
926
0
            match self
927
0
                .scheduler_collection
928
0
                .find_one_and_update(filter, update_doc)
929
0
                .upsert(true)
930
0
                .return_document(ReturnDocument::After)
931
0
                .await
932
            {
933
0
                Ok(Some(doc)) => Ok(doc.get_i64(VERSION_FIELD).ok().or(Some(1i64))),
934
0
                Ok(None) => Ok(None),
935
0
                Err(e) => Err(make_err!(
936
0
                    Code::Internal,
937
0
                    "MongoDB error in update_data: {e}"
938
0
                )),
939
            }
940
        } else {
941
0
            let data_bytes = data.try_into_bytes().map_err(|e| {
942
0
                make_err!(
943
0
                    Code::Internal,
944
                    "Could not convert value to bytes in ExperimentalMongoStore::update_data: {e}"
945
                )
946
0
            })?;
947
948
0
            let mut doc = doc! {
949
0
                KEY_FIELD: encoded_key.as_ref(),
950
0
                DATA_FIELD: Bson::Binary(mongodb::bson::Binary {
951
0
                    subtype: mongodb::bson::spec::BinarySubtype::Generic,
952
0
                    bytes: data_bytes.to_vec(),
953
0
                }),
954
            };
955
956
            // Add indexes
957
0
            for (name, value) in maybe_index {
958
0
                doc.insert(
959
0
                    name,
960
0
                    Bson::Binary(mongodb::bson::Binary {
961
0
                        subtype: mongodb::bson::spec::BinarySubtype::Generic,
962
0
                        bytes: value.to_vec(),
963
0
                    }),
964
0
                );
965
0
            }
966
967
0
            self.scheduler_collection
968
0
                .update_one(
969
0
                    doc! { KEY_FIELD: encoded_key.as_ref() },
970
0
                    doc! { "$set": doc },
971
0
                )
972
0
                .upsert(true)
973
0
                .await
974
0
                .map_err(|e| {
975
0
                    make_err!(Code::Internal, "Failed to update scheduler document: {e}")
976
0
                })?;
977
978
0
            Ok(Some(0))
979
        }
980
0
    }
981
982
0
    async fn search_by_index_prefix<K>(
983
0
        &self,
984
0
        index: K,
985
0
    ) -> Result<
986
0
        impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
987
0
        Error,
988
0
    >
989
0
    where
990
0
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,
991
0
    {
992
0
        let index_value = index.index_value();
993
994
        // Create index if it doesn't exist
995
0
        let index_name = format!("{}_{}", K::KEY_PREFIX, K::INDEX_NAME);
996
0
        self.scheduler_collection
997
0
            .create_index(
998
0
                IndexModel::builder()
999
0
                    .keys(doc! { K::INDEX_NAME: 1 })
1000
0
                    .options(IndexOptions::builder().name(index_name).build())
1001
0
                    .build(),
1002
0
            )
1003
0
            .await
1004
0
            .map_err(|e| make_err!(Code::Internal, "Failed to create scheduler index: {e}"))?;
1005
1006
        // Build filter
1007
0
        let filter = doc! {
1008
            K::INDEX_NAME: {
1009
0
                "$regex": format!("^{}", regex::escape(index_value.as_ref())),
1010
            }
1011
        };
1012
1013
        // Add sort if specified
1014
0
        let find_options = if let Some(sort_key) = K::MAYBE_SORT_KEY {
  Branch (1014:35): [Folded - Ignored]
  Branch (1014:35): [True: 0, False: 0]
  Branch (1014:35): [True: 0, False: 0]
  Branch (1014:35): [Folded - Ignored]
1015
0
            FindOptions::builder().sort(doc! { sort_key: 1 }).build()
1016
        } else {
1017
0
            FindOptions::default()
1018
        };
1019
1020
0
        let cursor = self
1021
0
            .scheduler_collection
1022
0
            .find(filter)
1023
0
            .with_options(find_options)
1024
0
            .await
1025
0
            .map_err(|e| {
1026
0
                make_err!(
1027
0
                    Code::Internal,
1028
                    "Failed to create cursor in search_by_index_prefix: {e}"
1029
                )
1030
0
            })?;
1031
1032
0
        Ok(cursor.map(move |result| {
1033
0
            let doc = result.map_err(|e| {
1034
0
                make_err!(
1035
0
                    Code::Internal,
1036
                    "Error reading document in search_by_index_prefix: {e}"
1037
                )
1038
0
            })?;
1039
1040
0
            let data = match doc.get(DATA_FIELD) {
1041
0
                Some(Bson::Binary(binary)) => Bytes::from(binary.bytes.clone()),
1042
                _ => {
1043
0
                    return Err(make_err!(
1044
0
                        Code::Internal,
1045
0
                        "Missing or invalid data field in search_by_index_prefix"
1046
0
                    ));
1047
                }
1048
            };
1049
1050
0
            let version = if <K as SchedulerIndexProvider>::Versioned::VALUE {
  Branch (1050:30): [Folded - Ignored]
  Branch (1050:30): [True: 0, False: 0]
  Branch (1050:30): [True: 0, False: 0]
  Branch (1050:30): [Folded - Ignored]
1051
0
                doc.get_i64(VERSION_FIELD).unwrap_or(0)
1052
            } else {
1053
0
                0
1054
            };
1055
1056
0
            K::decode(version, data).map_err(|e| {
1057
0
                make_err!(
1058
0
                    Code::Internal,
1059
                    "Failed to decode in search_by_index_prefix: {e}"
1060
                )
1061
0
            })
1062
0
        }))
1063
0
    }
1064
1065
0
    async fn get_and_decode<K>(
1066
0
        &self,
1067
0
        key: K,
1068
0
    ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>
1069
0
    where
1070
0
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send,
1071
0
    {
1072
0
        let key = key.get_key();
1073
0
        let encoded_key = self.encode_key(&key);
1074
0
        let filter = doc! { KEY_FIELD: encoded_key.as_ref() };
1075
1076
0
        let doc = self
1077
0
            .scheduler_collection
1078
0
            .find_one(filter)
1079
0
            .await
1080
0
            .map_err(|e| {
1081
0
                make_err!(
1082
0
                    Code::Internal,
1083
                    "Failed to find document in get_and_decode: {e}"
1084
                )
1085
0
            })?;
1086
1087
0
        let Some(doc) = doc else {
  Branch (1087:13): [Folded - Ignored]
  Branch (1087:13): [True: 0, False: 0]
  Branch (1087:13): [Folded - Ignored]
1088
0
            return Ok(None);
1089
        };
1090
1091
0
        let data = match doc.get(DATA_FIELD) {
1092
0
            Some(Bson::Binary(binary)) => Bytes::from(binary.bytes.clone()),
1093
0
            _ => return Ok(None),
1094
        };
1095
1096
0
        let version = if <K as SchedulerStoreKeyProvider>::Versioned::VALUE {
  Branch (1096:26): [Folded - Ignored]
  Branch (1096:26): [True: 0, False: 0]
  Branch (1096:26): [Folded - Ignored]
1097
0
            doc.get_i64(VERSION_FIELD).unwrap_or(0)
1098
        } else {
1099
0
            0
1100
        };
1101
1102
0
        Ok(Some(K::decode(version, data).map_err(|e| {
1103
0
            make_err!(Code::Internal, "Failed to decode in get_and_decode: {e}")
1104
0
        })?))
1105
0
    }
1106
}