Coverage Report

Created: 2026-04-14 11:55

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/mongo_store.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::ops::Bound;
17
use core::pin::Pin;
18
use core::sync::atomic::{AtomicUsize, Ordering};
19
use core::time::Duration;
20
use std::borrow::Cow;
21
use std::sync::{Arc, Weak};
22
23
use async_trait::async_trait;
24
use bytes::Bytes;
25
use futures::stream::{Stream, StreamExt, TryStreamExt};
26
use mongodb::bson::{Bson, Document, doc};
27
use mongodb::options::{ClientOptions, FindOptions, IndexOptions, ReturnDocument, WriteConcern};
28
use mongodb::{Client as MongoClient, Collection, Database, IndexModel};
29
use nativelink_config::stores::ExperimentalMongoSpec;
30
use nativelink_error::{Code, Error, make_err, make_input_err};
31
use nativelink_metric::MetricsComponent;
32
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
33
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
34
use nativelink_util::spawn;
35
use nativelink_util::store_trait::{
36
    BoolValue, RemoveItemCallback, SchedulerCurrentVersionProvider, SchedulerIndexProvider,
37
    SchedulerStore, SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,
38
    SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo,
39
};
40
use nativelink_util::task::JoinHandleDropGuard;
41
use parking_lot::{Mutex, RwLock};
42
use patricia_tree::StringPatriciaMap;
43
use tokio::sync::{Semaphore, SemaphorePermit, watch};
44
use tokio::time::sleep;
45
use tracing::{error, info, trace, warn};
46
47
use crate::cas_utils::is_zero_digest;
48
49
/// The default database name if not specified.
50
const DEFAULT_DB_NAME: &str = "nativelink";
51
52
/// The default collection name if not specified.
53
const DEFAULT_COLLECTION_NAME: &str = "cas";
54
55
/// The default scheduler collection name if not specified.
56
const DEFAULT_SCHEDULER_COLLECTION_NAME: &str = "scheduler";
57
58
/// The default size of the read chunk when reading data from `MongoDB`.
59
const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024;
60
61
/// The default connection timeout in milliseconds if not specified.
62
const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000;
63
64
/// The default command timeout in milliseconds if not specified.
65
const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000;
66
67
/// The name of the field in `MongoDB` documents that stores the key.
68
const KEY_FIELD: &str = "_id";
69
70
/// The name of the field in `MongoDB` documents that stores the data.
71
const DATA_FIELD: &str = "data";
72
73
/// The name of the field in `MongoDB` documents that stores the version.
74
const VERSION_FIELD: &str = "version";
75
76
/// The name of the field in `MongoDB` documents that stores the size.
77
const SIZE_FIELD: &str = "size";
78
79
/// A [`StoreDriver`] implementation that uses `MongoDB` as a backing store.
80
#[derive(Debug, MetricsComponent)]
81
pub struct ExperimentalMongoStore {
82
    /// The `MongoDB` client.
83
    #[allow(dead_code)]
84
    client: MongoClient,
85
86
    /// The database to use.
87
    database: Database,
88
89
    /// The collection for CAS data.
90
    cas_collection: Collection<Document>,
91
92
    /// The collection for scheduler data.
93
    scheduler_collection: Collection<Document>,
94
95
    /// A common prefix to append to all keys before they are sent to `MongoDB`.
96
    #[metric(help = "Prefix to append to all keys before sending to MongoDB")]
97
    key_prefix: String,
98
99
    /// The amount of data to read from `MongoDB` at a time.
100
    #[metric(help = "The amount of data to read from MongoDB at a time")]
101
    read_chunk_size: usize,
102
103
    /// Enable change streams for real-time updates.
104
    #[metric(help = "Whether change streams are enabled")]
105
    enable_change_streams: bool,
106
107
    /// A manager for subscriptions to keys in `MongoDB`.
108
    subscription_manager: Mutex<Option<Arc<ExperimentalMongoSubscriptionManager>>>,
109
110
    /// Limits the number of requests at any one time
111
    request_permits: Arc<Semaphore>,
112
113
    /// Keep track of the `request_permits` queue size
114
    waiting_permits: Arc<AtomicUsize>,
115
}
116
117
impl ExperimentalMongoStore {
118
    /// Create a new `ExperimentalMongoStore` from the given configuration.
119
17
    pub async fn new(mut spec: ExperimentalMongoSpec) -> Result<Arc<Self>, Error> {
120
        // Set defaults
121
17
        if spec.connection_string.is_empty() {
122
1
            return Err(make_err!(
123
1
                Code::InvalidArgument,
124
1
                "No connection string was specified in mongo store configuration."
125
1
            ));
126
16
        }
127
128
16
        if spec.database.is_empty() {
129
1
            spec.database = DEFAULT_DB_NAME.to_string();
130
15
        }
131
132
16
        if spec.cas_collection.is_empty() {
133
1
            spec.cas_collection = DEFAULT_COLLECTION_NAME.to_string();
134
15
        }
135
136
16
        if spec.scheduler_collection.is_empty() {
137
1
            spec.scheduler_collection = DEFAULT_SCHEDULER_COLLECTION_NAME.to_string();
138
15
        }
139
140
16
        if spec.read_chunk_size == 0 {
141
1
            spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
142
15
        }
143
144
16
        if spec.connection_timeout_ms == 0 {
145
1
            spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS;
146
15
        }
147
148
16
        if spec.command_timeout_ms == 0 {
149
1
            spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS;
150
15
        }
151
152
16
        if spec.max_concurrent_uploads != 0 {
153
15
            warn!(
154
                "max_concurrent_uploads was set for Mongo, and it's a deprecated value we don't use anymore"
155
            );
156
1
        }
157
158
16
        if let Some(
max_permits2
) = spec.max_requests
159
2
            && max_permits == 0
160
        {
161
1
            return Err(make_err!(
162
1
                Code::InvalidArgument,
163
1
                "max_request_permits was set to zero, which will block mongo_store from working at all"
164
1
            ));
165
15
        }
166
167
        // Configure client options
168
15
        let 
mut client_options14
= ClientOptions::parse(&spec.connection_string)
169
15
            .await
170
15
            .map_err(|e| 
{1
171
1
                make_err!(
172
1
                    Code::InvalidArgument,
173
                    "Failed to parse MongoDB connection string: {e}"
174
                )
175
1
            })?;
176
177
14
        client_options.server_selection_timeout =
178
14
            Some(Duration::from_millis(spec.connection_timeout_ms));
179
14
        client_options.connect_timeout = Some(Duration::from_millis(spec.connection_timeout_ms));
180
181
        // Set write concern if specified
182
14
        if let Some(
w13
) = spec.write_concern_w {
183
            // Parse write concern w - can be a number or string like "majority"
184
13
            let w_value = if let Ok(
num0
) = w.parse::<u32>() {
185
0
                Some(num.into())
186
            } else {
187
13
                Some(w.into())
188
            };
189
190
            // Build write concern based on which options are set
191
13
            let write_concern = match (w_value, spec.write_concern_j, spec.write_concern_timeout_ms)
192
            {
193
13
                (Some(w), Some(j), Some(timeout)) => WriteConcern::builder()
194
13
                    .w(Some(w))
195
13
                    .journal(j)
196
13
                    .w_timeout(Some(Duration::from_millis(u64::from(timeout))))
197
13
                    .build(),
198
0
                (Some(w), Some(j), None) => WriteConcern::builder().w(Some(w)).journal(j).build(),
199
0
                (Some(w), None, Some(timeout)) => WriteConcern::builder()
200
0
                    .w(Some(w))
201
0
                    .w_timeout(Some(Duration::from_millis(u64::from(timeout))))
202
0
                    .build(),
203
0
                (Some(w), None, None) => WriteConcern::builder().w(Some(w)).build(),
204
0
                _ => unreachable!(), // We know w is Some because we're in the if let Some(w) block
205
            };
206
207
13
            client_options.write_concern = Some(write_concern);
208
1
        } else if spec.write_concern_j.is_some() || 
spec.write_concern_timeout_ms0
.
is_some0
() {
209
1
            return Err(make_err!(
210
1
                Code::InvalidArgument,
211
1
                "write_concern_w not set, but j and/or timeout set. Please set 'write_concern_w' to a non-default value. See https://www.mongodb.com/docs/manual/reference/write-concern/#w-option for options."
212
1
            ));
213
0
        }
214
215
        // Create client
216
13
        let client = MongoClient::with_options(client_options).map_err(|e| 
{0
217
0
            make_err!(
218
0
                Code::InvalidArgument,
219
                "Failed to create MongoDB client: {e}"
220
            )
221
0
        })?;
222
223
        // Get database and collections
224
13
        let database = client.database(&spec.database);
225
13
        let cas_collection = database.collection::<Document>(&spec.cas_collection);
226
13
        let scheduler_collection = database.collection::<Document>(&spec.scheduler_collection);
227
228
        // Create indexes
229
13
        Self::create_indexes(&cas_collection, &scheduler_collection).await
?0
;
230
231
13
        let store = Self {
232
13
            client,
233
13
            database,
234
13
            cas_collection,
235
13
            scheduler_collection,
236
13
            key_prefix: spec.key_prefix.clone().unwrap_or_default(),
237
13
            read_chunk_size: spec.read_chunk_size,
238
13
            enable_change_streams: spec.enable_change_streams,
239
13
            subscription_manager: Mutex::new(None),
240
13
            request_permits: Arc::new(Semaphore::new(
241
13
                spec.max_requests.unwrap_or(Semaphore::MAX_PERMITS),
242
13
            )),
243
13
            waiting_permits: Arc::new(AtomicUsize::new(0)),
244
13
        };
245
246
13
        Ok(Arc::new(store))
247
17
    }
248
249
    /// Create necessary indexes for efficient operations.
250
13
    async fn create_indexes(
251
13
        cas_collection: &Collection<Document>,
252
13
        _scheduler_collection: &Collection<Document>,
253
13
    ) -> Result<(), Error> {
254
        // CAS collection indexes
255
13
        cas_collection
256
13
            .create_index(IndexModel::builder().keys(doc! { SIZE_FIELD: 1 }).build())
257
13
            .await
258
13
            .map_err(|e| 
{0
259
0
                make_err!(
260
0
                    Code::Internal,
261
                    "Failed to create size index on CAS collection: {e}"
262
                )
263
0
            })?;
264
265
        // Scheduler collection will have dynamic indexes created as needed
266
267
13
        Ok(())
268
13
    }
269
270
    /// Encode a [`StoreKey`] so it can be sent to `MongoDB`.
271
55
    fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> {
272
55
        let key_body = key.as_str();
273
55
        if self.key_prefix.is_empty() {
274
55
            key_body
275
        } else {
276
0
            match key_body {
277
0
                Cow::Owned(mut encoded_key) => {
278
0
                    encoded_key.insert_str(0, &self.key_prefix);
279
0
                    Cow::Owned(encoded_key)
280
                }
281
0
                Cow::Borrowed(body) => {
282
0
                    let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len());
283
0
                    encoded_key.push_str(&self.key_prefix);
284
0
                    encoded_key.push_str(body);
285
0
                    Cow::Owned(encoded_key)
286
                }
287
            }
288
        }
289
55
    }
290
291
    /// Decode a key from `MongoDB` by removing the prefix.
292
0
    fn decode_key(&self, key: &str) -> Option<String> {
293
0
        if self.key_prefix.is_empty() {
294
0
            Some(key.to_string())
295
        } else {
296
0
            key.strip_prefix(&self.key_prefix).map(ToString::to_string)
297
        }
298
0
    }
299
300
53
    async fn acquire_permit(&self) -> Result<SemaphorePermit<'_>, Error> {
301
53
        let waiting = self.waiting_permits.fetch_add(1, Ordering::Relaxed);
302
303
53
        if waiting > 0 && 
waiting0
.
is_multiple_of0
(100) {
304
0
            info!(waiting, "Number of waiting permits for Mongo");
305
        } else {
306
53
            trace!(waiting, "Number of waiting permits for Mongo");
307
        }
308
53
        let permit = self.request_permits.acquire().await;
309
53
        self.waiting_permits.fetch_sub(1, Ordering::Relaxed);
310
53
        Ok(permit
?0
)
311
53
    }
312
}
313
314
#[async_trait]
315
impl StoreDriver for ExperimentalMongoStore {
316
    async fn has_with_results(
317
        self: Pin<&Self>,
318
        keys: &[StoreKey<'_>],
319
        results: &mut [Option<u64>],
320
18
    ) -> Result<(), Error> {
321
        for (key, result) in keys.iter().zip(results.iter_mut()) {
322
            // Handle zero digest specially
323
            if is_zero_digest(key.borrow()) {
324
                *result = Some(0);
325
                continue;
326
            }
327
328
            let encoded_key = self.encode_key(key);
329
            let filter = doc! { KEY_FIELD: encoded_key.as_ref() };
330
331
            // We could do this with acquire_many, but that's unsafe if the number of keys is greater
332
            // than the number of permits, as it'll block forever. Doing this one at a time is guaranteed
333
            // not to block provided no-one sets permits to 0, and we check for that case at startup.
334
            let semaphore = self.acquire_permit().await?;
335
336
            match self.cas_collection.find_one(filter).await {
337
                Ok(Some(doc)) => {
338
17
                    *result = doc.get_i64(SIZE_FIELD).ok().map(|v| v as u64);
339
                }
340
                Ok(None) => {
341
                    *result = None;
342
                }
343
                Err(e) => {
344
                    return Err(make_err!(
345
                        Code::Internal,
346
                        "MongoDB error in has_with_results: {e}"
347
                    ));
348
                }
349
            }
350
            drop(semaphore);
351
        }
352
353
        Ok(())
354
18
    }
355
356
    async fn list(
357
        self: Pin<&Self>,
358
        range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
359
        handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
360
0
    ) -> Result<u64, Error> {
361
        let mut filter = Document::new();
362
363
        // Build range query
364
        let mut key_filter = Document::new();
365
        match &range.0 {
366
            Bound::Included(start) => {
367
                let encoded = self.encode_key(start);
368
                key_filter.insert("$gte", encoded.as_ref());
369
            }
370
            Bound::Excluded(start) => {
371
                let encoded = self.encode_key(start);
372
                key_filter.insert("$gt", encoded.as_ref());
373
            }
374
            Bound::Unbounded => {}
375
        }
376
377
        match &range.1 {
378
            Bound::Included(end) => {
379
                let encoded = self.encode_key(end);
380
                key_filter.insert("$lte", encoded.as_ref());
381
            }
382
            Bound::Excluded(end) => {
383
                let encoded = self.encode_key(end);
384
                key_filter.insert("$lt", encoded.as_ref());
385
            }
386
            Bound::Unbounded => {}
387
        }
388
389
        if !key_filter.is_empty() {
390
            filter.insert(KEY_FIELD, key_filter);
391
        }
392
393
        // Add prefix filter if needed
394
        if !self.key_prefix.is_empty() {
395
            let regex_filter = doc! {
396
                KEY_FIELD: {
397
                    "$regex": format!("^{}", regex::escape(&self.key_prefix)),
398
                }
399
            };
400
            if filter.is_empty() {
401
                filter = regex_filter;
402
            } else {
403
                filter = doc! { "$and": [filter, regex_filter] };
404
            }
405
        }
406
407
        let semaphore = self.acquire_permit().await?;
408
409
        let mut cursor = self
410
            .cas_collection
411
            .find(filter)
412
            .projection(doc! { KEY_FIELD: 1 })
413
            .await
414
0
            .map_err(|e| make_err!(Code::Internal, "Failed to create cursor in list: {e}"))?;
415
416
        let mut count = 0u64;
417
        while let Some(doc) = cursor
418
            .try_next()
419
            .await
420
0
            .map_err(|e| make_err!(Code::Internal, "Failed to get next document in list: {e}"))?
421
        {
422
            if let Ok(key) = doc.get_str(KEY_FIELD)
423
                && let Some(decoded_key) = self.decode_key(key)
424
            {
425
                let store_key = StoreKey::new_str(&decoded_key);
426
                count += 1;
427
                if !handler(&store_key) {
428
                    break;
429
                }
430
            }
431
        }
432
433
        drop(semaphore);
434
435
        Ok(count)
436
0
    }
437
438
    async fn update(
439
        self: Pin<&Self>,
440
        key: StoreKey<'_>,
441
        mut reader: DropCloserReadHalf,
442
        upload_size: UploadSizeInfo,
443
20
    ) -> Result<(), Error> {
444
        let encoded_key = self.encode_key(&key);
445
446
        // Handle zero digest
447
        if is_zero_digest(key.borrow()) {
448
0
            let chunk = reader.peek().await.map_err(|e| {
449
0
                make_err!(
450
0
                    Code::Internal,
451
                    "Failed to peek in ExperimentalMongoStore::update: {e}"
452
                )
453
0
            })?;
454
            if chunk.is_empty() {
455
0
                reader.drain().await.map_err(|e| {
456
0
                    make_err!(
457
0
                        Code::Internal,
458
                        "Failed to drain in ExperimentalMongoStore::update: {e}"
459
                    )
460
0
                })?;
461
                return Ok(());
462
            }
463
        }
464
465
        // Special handling for MaxSize(0) - this should error if stream is closed
466
        if upload_size == UploadSizeInfo::MaxSize(0) {
467
            // Try to read from the stream - if it's closed immediately, this should error
468
            match reader.recv().await {
469
                Ok(_chunk) => {
470
                    return Err(make_input_err!(
471
                        "Received data when MaxSize is 0 in MongoDB store"
472
                    ));
473
                }
474
                Err(e) => {
475
                    return Err(make_err!(
476
                        Code::InvalidArgument,
477
                        "Stream closed for MaxSize(0) upload: {e}"
478
                    ));
479
                }
480
            }
481
        }
482
483
        // Read all data into memory with proper EOF handling
484
        let mut data = Vec::new();
485
        while let Ok(chunk) = reader.recv().await {
486
            if chunk.is_empty() {
487
                break; // Empty chunk signals EOF
488
            }
489
            data.extend_from_slice(&chunk);
490
        }
491
492
        let size = data.len() as i64;
493
494
        // Create document
495
        let doc = doc! {
496
            KEY_FIELD: encoded_key.as_ref(),
497
            DATA_FIELD: Bson::Binary(mongodb::bson::Binary {
498
                subtype: mongodb::bson::spec::BinarySubtype::Generic,
499
                bytes: data,
500
            }),
501
            SIZE_FIELD: size,
502
        };
503
504
        let semaphore = self.acquire_permit().await?;
505
506
        // Upsert the document
507
        self.cas_collection
508
            .update_one(
509
                doc! { KEY_FIELD: encoded_key.as_ref() },
510
                doc! { "$set": doc },
511
            )
512
            .upsert(true)
513
            .await
514
0
            .map_err(|e| make_err!(Code::Internal, "Failed to update document in MongoDB: {e}"))?;
515
516
        drop(semaphore);
517
518
        Ok(())
519
20
    }
520
521
    async fn get_part(
522
        self: Pin<&Self>,
523
        key: StoreKey<'_>,
524
        writer: &mut DropCloserWriteHalf,
525
        offset: u64,
526
        length: Option<u64>,
527
18
    ) -> Result<(), Error> {
528
        // Handle zero digest
529
        if is_zero_digest(key.borrow()) {
530
0
            return writer.send_eof().map_err(|e| {
531
0
                make_err!(
532
0
                    Code::Internal,
533
                    "Failed to send zero EOF in mongo store get_part: {e}"
534
                )
535
0
            });
536
        }
537
538
        let encoded_key = self.encode_key(&key);
539
        let filter = doc! { KEY_FIELD: encoded_key.as_ref() };
540
541
        let semaphore = self.acquire_permit().await?;
542
543
        let doc = self
544
            .cas_collection
545
            .find_one(filter)
546
            .await
547
0
            .map_err(|e| make_err!(Code::Internal, "Failed to find document in get_part: {e}"))?
548
1
            .ok_or_else(|| {
549
1
                make_err!(
550
1
                    Code::NotFound,
551
                    "Data not found in MongoDB store for digest: {key:?}"
552
                )
553
1
            })?;
554
555
        let data = match doc.get(DATA_FIELD) {
556
            Some(Bson::Binary(binary)) => &binary.bytes,
557
            _ => {
558
                return Err(make_err!(
559
                    Code::Internal,
560
                    "Invalid data field in MongoDB document"
561
                ));
562
            }
563
        };
564
565
        let offset = usize::try_from(offset).unwrap_or(usize::MAX);
566
        let data_len = data.len();
567
568
        if offset > data_len {
569
0
            return writer.send_eof().map_err(|e| {
570
0
                make_err!(
571
0
                    Code::Internal,
572
                    "Failed to send EOF in mongo store get_part: {e}"
573
                )
574
0
            });
575
        }
576
577
        let end = if let Some(len) = length {
578
            cmp::min(
579
                offset.saturating_add(usize::try_from(len).unwrap_or(usize::MAX)),
580
                data_len,
581
            )
582
        } else {
583
            data_len
584
        };
585
586
        if offset < end {
587
            let chunk = &data[offset..end];
588
589
            // Send data in chunks
590
            for chunk_data in chunk.chunks(self.read_chunk_size) {
591
0
                writer.send(chunk_data.to_vec().into()).await.map_err(|e| {
592
0
                    make_err!(
593
0
                        Code::Internal,
594
                        "Failed to write data in ExperimentalMongoStore::get_part: {e}"
595
                    )
596
0
                })?;
597
            }
598
        }
599
600
        drop(semaphore);
601
602
0
        writer.send_eof().map_err(|e| {
603
0
            make_err!(
604
0
                Code::Internal,
605
                "Failed to write EOF in mongo store get_part: {e}"
606
            )
607
0
        })
608
18
    }
609
610
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
611
0
        self
612
0
    }
613
614
0
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send) {
615
0
        self
616
0
    }
617
618
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send> {
619
0
        self
620
0
    }
621
622
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
623
0
        registry.register_indicator(self);
624
0
    }
625
626
0
    fn register_remove_callback(
627
0
        self: Arc<Self>,
628
0
        _callback: Arc<dyn RemoveItemCallback>,
629
0
    ) -> Result<(), Error> {
630
        // drop because we don't remove anything from Mongo
631
0
        Ok(())
632
0
    }
633
}
634
635
#[async_trait]
636
impl HealthStatusIndicator for ExperimentalMongoStore {
637
0
    fn get_name(&self) -> &'static str {
638
0
        "ExperimentalMongoStore"
639
0
    }
640
641
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
642
        // Note we do not acquire a request_permit here, as the health check needs to always go through
643
        // even if everything else is fully loaded
644
        match self.database.run_command(doc! { "ping": 1 }).await {
645
            Ok(_) => HealthStatus::new_ok(self, "Connection healthy".into()),
646
            Err(e) => HealthStatus::new_failed(
647
                self,
648
                format!("{namespace} - MongoDB connection error: {e}").into(),
649
            ),
650
        }
651
0
    }
652
}
653
654
// -------------------------------------------------------------------
655
//
656
// `ExperimentalMongoDB` scheduler implementation. Likely to change
657
// -------------------------------------------------------------------
658
659
/// An individual subscription to a key in `ExperimentalMongoDB`.
660
#[derive(Debug)]
661
pub struct ExperimentalMongoSubscription {
662
    receiver: Option<watch::Receiver<String>>,
663
    weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<ExperimentalMongoSubscriptionPublisher>>>,
664
}
665
666
impl SchedulerSubscription for ExperimentalMongoSubscription {
667
0
    async fn changed(&mut self) -> Result<(), Error> {
668
0
        let receiver = self.receiver.as_mut().ok_or_else(|| {
669
0
            make_err!(
670
0
                Code::Internal,
671
                "In ExperimentalMongoSubscription::changed::as_mut"
672
            )
673
0
        })?;
674
0
        receiver.changed().await.map_err(|err| {
675
0
            Error::from_std_err(Code::Internal, &err)
676
0
                .append("In ExperimentalMongoSubscription::changed::changed")
677
0
        })
678
0
    }
679
}
680
681
impl Drop for ExperimentalMongoSubscription {
682
0
    fn drop(&mut self) {
683
0
        let Some(receiver) = self.receiver.take() else {
684
0
            warn!("ExperimentalMongoSubscription has already been dropped, nothing to do.");
685
0
            return;
686
        };
687
0
        let key = receiver.borrow().clone();
688
0
        drop(receiver);
689
690
0
        let Some(subscribed_keys) = self.weak_subscribed_keys.upgrade() else {
691
0
            return;
692
        };
693
0
        let mut subscribed_keys = subscribed_keys.write();
694
0
        let Some(value) = subscribed_keys.get(&key) else {
695
0
            error!(
696
                "Key {key} was not found in subscribed keys when checking if it should be removed."
697
            );
698
0
            return;
699
        };
700
701
0
        if value.receiver_count() == 0 {
702
0
            subscribed_keys.remove(key);
703
0
        }
704
0
    }
705
}
706
707
/// A publisher for a key in `ExperimentalMongoDB`.
708
#[derive(Debug)]
709
struct ExperimentalMongoSubscriptionPublisher {
710
    sender: Mutex<watch::Sender<String>>,
711
}
712
713
impl ExperimentalMongoSubscriptionPublisher {
714
0
    fn new(
715
0
        key: String,
716
0
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
717
0
    ) -> (Self, ExperimentalMongoSubscription) {
718
0
        let (sender, receiver) = watch::channel(key);
719
0
        let publisher = Self {
720
0
            sender: Mutex::new(sender),
721
0
        };
722
0
        let subscription = ExperimentalMongoSubscription {
723
0
            receiver: Some(receiver),
724
0
            weak_subscribed_keys,
725
0
        };
726
0
        (publisher, subscription)
727
0
    }
728
729
0
    fn subscribe(
730
0
        &self,
731
0
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
732
0
    ) -> ExperimentalMongoSubscription {
733
0
        let receiver = self.sender.lock().subscribe();
734
0
        ExperimentalMongoSubscription {
735
0
            receiver: Some(receiver),
736
0
            weak_subscribed_keys,
737
0
        }
738
0
    }
739
740
0
    fn receiver_count(&self) -> usize {
741
0
        self.sender.lock().receiver_count()
742
0
    }
743
744
0
    fn notify(&self) {
745
0
        self.sender.lock().send_modify(|_| {});
746
0
    }
747
}
748
749
#[derive(Debug)]
750
pub struct ExperimentalMongoSubscriptionManager {
751
    subscribed_keys: Arc<RwLock<StringPatriciaMap<ExperimentalMongoSubscriptionPublisher>>>,
752
    _subscription_spawn: JoinHandleDropGuard<()>,
753
}
754
755
impl ExperimentalMongoSubscriptionManager {
756
0
    pub fn new(database: Database, collection_name: String, key_prefix: String) -> Self {
757
0
        let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new()));
758
0
        let subscribed_keys_weak = Arc::downgrade(&subscribed_keys);
759
760
        Self {
761
0
            subscribed_keys,
762
0
            _subscription_spawn: spawn!("mongo_subscribe_spawn", async move {
763
0
                let collection = database.collection::<Document>(&collection_name);
764
765
                loop {
766
                    // Try to create change stream
767
0
                    let change_stream_result = collection.watch().await;
768
769
0
                    match change_stream_result {
770
0
                        Ok(mut change_stream) => {
771
0
                            info!("MongoDB change stream connected");
772
773
0
                            while let Some(event_result) = change_stream.next().await {
774
0
                                match event_result {
775
0
                                    Ok(event) => {
776
                                        use mongodb::change_stream::event::OperationType;
777
0
                                        match event.operation_type {
778
                                            OperationType::Insert
779
                                            | OperationType::Update
780
                                            | OperationType::Replace
781
                                            | OperationType::Delete => {
782
0
                                                if let Some(doc_key) = event.document_key
783
0
                                                    && let Ok(key) = doc_key.get_str(KEY_FIELD)
784
                                                {
785
                                                    // Remove prefix if present
786
0
                                                    let key = if key_prefix.is_empty() {
787
0
                                                        key
788
                                                    } else {
789
0
                                                        key.strip_prefix(&key_prefix).unwrap_or(key)
790
                                                    };
791
792
0
                                                    let Some(subscribed_keys) =
793
0
                                                        subscribed_keys_weak.upgrade()
794
                                                    else {
795
0
                                                        warn!(
796
                                                            "Parent dropped, exiting ExperimentalMongoSubscriptionManager"
797
                                                        );
798
0
                                                        return;
799
                                                    };
800
801
0
                                                    let subscribed_keys_mux =
802
0
                                                        subscribed_keys.read();
803
0
                                                    subscribed_keys_mux
804
0
                                                                .common_prefix_values(key)
805
0
                                                                .for_each(ExperimentalMongoSubscriptionPublisher::notify);
806
0
                                                }
807
                                            }
808
0
                                            _ => {}
809
                                        }
810
                                    }
811
0
                                    Err(e) => {
812
0
                                        error!("Error in change stream: {e}");
813
0
                                        break;
814
                                    }
815
                                }
816
                            }
817
                        }
818
0
                        Err(e) => {
819
0
                            warn!("Failed to create change stream: {e}. Will retry in 5 seconds.");
820
                        }
821
                    }
822
823
                    // Check if parent is still alive
824
0
                    if subscribed_keys_weak.upgrade().is_none() {
825
0
                        warn!("Parent dropped, exiting ExperimentalMongoSubscriptionManager");
826
0
                        return;
827
0
                    }
828
829
                    // Sleep before retry
830
0
                    sleep(Duration::from_secs(5)).await;
831
832
                    // Notify all subscribers on reconnection
833
0
                    if let Some(subscribed_keys) = subscribed_keys_weak.upgrade() {
834
0
                        let subscribed_keys_mux = subscribed_keys.read();
835
0
                        for publisher in subscribed_keys_mux.values() {
836
0
                            publisher.notify();
837
0
                        }
838
0
                    }
839
                }
840
0
            }),
841
        }
842
0
    }
843
}
844
845
impl SchedulerSubscriptionManager for ExperimentalMongoSubscriptionManager {
846
    type Subscription = ExperimentalMongoSubscription;
847
848
0
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
849
0
    where
850
0
        K: SchedulerStoreKeyProvider,
851
    {
852
0
        let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys);
853
0
        let mut subscribed_keys = self.subscribed_keys.write();
854
0
        let key = key.get_key();
855
0
        let key_str = key.as_str();
856
857
0
        let mut subscription = if let Some(publisher) = subscribed_keys.get(&key_str) {
858
0
            publisher.subscribe(weak_subscribed_keys)
859
        } else {
860
0
            let (publisher, subscription) = ExperimentalMongoSubscriptionPublisher::new(
861
0
                key_str.to_string(),
862
0
                weak_subscribed_keys,
863
0
            );
864
0
            subscribed_keys.insert(key_str, publisher);
865
0
            subscription
866
        };
867
868
0
        subscription
869
0
            .receiver
870
0
            .as_mut()
871
0
            .ok_or_else(|| {
872
0
                make_err!(
873
0
                    Code::Internal,
874
                    "Receiver should be set in ExperimentalMongoSubscriptionManager::subscribe"
875
                )
876
0
            })?
877
0
            .mark_changed();
878
879
0
        Ok(subscription)
880
0
    }
881
882
0
    fn is_reliable() -> bool {
883
0
        true
884
0
    }
885
}
886
887
impl SchedulerStore for ExperimentalMongoStore {
888
    type SubscriptionManager = ExperimentalMongoSubscriptionManager;
889
890
0
    async fn subscription_manager(
891
0
        &self,
892
0
    ) -> Result<Arc<ExperimentalMongoSubscriptionManager>, Error> {
893
0
        let mut subscription_manager = self.subscription_manager.lock();
894
0
        if let Some(subscription_manager) = &*subscription_manager {
895
0
            Ok(subscription_manager.clone())
896
        } else {
897
0
            if !self.enable_change_streams {
898
0
                return Err(make_input_err!(
899
0
                    "ExperimentalMongoStore must have change streams enabled for scheduler subscriptions"
900
0
                ));
901
0
            }
902
903
0
            let sub = Arc::new(ExperimentalMongoSubscriptionManager::new(
904
0
                self.database.clone(),
905
0
                self.scheduler_collection.name().to_string(),
906
0
                self.key_prefix.clone(),
907
            ));
908
0
            *subscription_manager = Some(sub.clone());
909
0
            Ok(sub)
910
        }
911
0
    }
912
913
0
    async fn update_data<T>(&self, data: T) -> Result<Option<i64>, Error>
914
0
    where
915
0
        T: SchedulerStoreDataProvider
916
0
            + SchedulerStoreKeyProvider
917
0
            + SchedulerCurrentVersionProvider
918
0
            + Send,
919
0
    {
920
0
        let key = data.get_key();
921
0
        let encoded_key = self.encode_key(&key);
922
0
        let maybe_index = data.get_indexes().map_err(|e| {
923
0
            make_err!(
924
0
                Code::Internal,
925
                "Error getting indexes in ExperimentalMongoStore::update_data: {e}"
926
            )
927
0
        })?;
928
929
0
        if <T as SchedulerStoreKeyProvider>::Versioned::VALUE {
930
0
            let current_version = data.current_version();
931
0
            let data_bytes = data.try_into_bytes().map_err(|e| {
932
0
                make_err!(
933
0
                    Code::Internal,
934
                    "Could not convert value to bytes in ExperimentalMongoStore::update_data: {e}"
935
                )
936
0
            })?;
937
938
0
            let mut update_doc = doc! {
939
                "$set": {
940
0
                    DATA_FIELD: Bson::Binary(mongodb::bson::Binary {
941
0
                        subtype: mongodb::bson::spec::BinarySubtype::Generic,
942
0
                        bytes: data_bytes.to_vec(),
943
0
                    }),
944
                },
945
                "$inc": {
946
                    VERSION_FIELD: 1i64,
947
                }
948
            };
949
950
            // Add indexes
951
0
            for (name, value) in maybe_index {
952
0
                update_doc.get_document_mut("$set").unwrap().insert(
953
0
                    name,
954
0
                    Bson::Binary(mongodb::bson::Binary {
955
0
                        subtype: mongodb::bson::spec::BinarySubtype::Generic,
956
0
                        bytes: value.to_vec(),
957
0
                    }),
958
0
                );
959
0
            }
960
961
0
            let filter = doc! {
962
0
                KEY_FIELD: encoded_key.as_ref(),
963
0
                VERSION_FIELD: current_version,
964
            };
965
966
0
            let semaphore = self.acquire_permit().await?;
967
968
0
            let result = match self
969
0
                .scheduler_collection
970
0
                .find_one_and_update(filter, update_doc)
971
0
                .upsert(true)
972
0
                .return_document(ReturnDocument::After)
973
0
                .await
974
            {
975
0
                Ok(Some(doc)) => Ok(doc.get_i64(VERSION_FIELD).ok().or(Some(1i64))),
976
0
                Ok(None) => Ok(None),
977
0
                Err(e) => Err(make_err!(
978
0
                    Code::Internal,
979
0
                    "MongoDB error in update_data: {e}"
980
0
                )),
981
            };
982
0
            drop(semaphore);
983
0
            result
984
        } else {
985
0
            let data_bytes = data.try_into_bytes().map_err(|e| {
986
0
                make_err!(
987
0
                    Code::Internal,
988
                    "Could not convert value to bytes in ExperimentalMongoStore::update_data: {e}"
989
                )
990
0
            })?;
991
992
0
            let mut doc = doc! {
993
0
                KEY_FIELD: encoded_key.as_ref(),
994
0
                DATA_FIELD: Bson::Binary(mongodb::bson::Binary {
995
0
                    subtype: mongodb::bson::spec::BinarySubtype::Generic,
996
0
                    bytes: data_bytes.to_vec(),
997
0
                }),
998
            };
999
1000
            // Add indexes
1001
0
            for (name, value) in maybe_index {
1002
0
                doc.insert(
1003
0
                    name,
1004
0
                    Bson::Binary(mongodb::bson::Binary {
1005
0
                        subtype: mongodb::bson::spec::BinarySubtype::Generic,
1006
0
                        bytes: value.to_vec(),
1007
0
                    }),
1008
0
                );
1009
0
            }
1010
1011
0
            let semaphore = self.acquire_permit().await?;
1012
1013
0
            self.scheduler_collection
1014
0
                .update_one(
1015
0
                    doc! { KEY_FIELD: encoded_key.as_ref() },
1016
0
                    doc! { "$set": doc },
1017
0
                )
1018
0
                .upsert(true)
1019
0
                .await
1020
0
                .map_err(|e| {
1021
0
                    make_err!(Code::Internal, "Failed to update scheduler document: {e}")
1022
0
                })?;
1023
1024
0
            drop(semaphore);
1025
1026
0
            Ok(Some(0))
1027
        }
1028
0
    }
1029
1030
0
    async fn search_by_index_prefix<K>(
1031
0
        &self,
1032
0
        index: K,
1033
0
    ) -> Result<
1034
0
        impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
1035
0
        Error,
1036
0
    >
1037
0
    where
1038
0
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,
1039
0
    {
1040
0
        let index_value = index.index_value();
1041
1042
        // Create index if it doesn't exist
1043
0
        let index_name = format!("{}_{}", K::KEY_PREFIX, K::INDEX_NAME);
1044
0
        self.scheduler_collection
1045
0
            .create_index(
1046
0
                IndexModel::builder()
1047
0
                    .keys(doc! { K::INDEX_NAME: 1 })
1048
0
                    .options(IndexOptions::builder().name(index_name).build())
1049
0
                    .build(),
1050
0
            )
1051
0
            .await
1052
0
            .map_err(|e| make_err!(Code::Internal, "Failed to create scheduler index: {e}"))?;
1053
1054
        // Build filter
1055
0
        let filter = doc! {
1056
            K::INDEX_NAME: {
1057
0
                "$regex": format!("^{}", regex::escape(index_value.as_ref())),
1058
            }
1059
        };
1060
1061
        // Add sort if specified
1062
0
        let find_options = if let Some(sort_key) = K::MAYBE_SORT_KEY {
1063
0
            FindOptions::builder().sort(doc! { sort_key: 1 }).build()
1064
        } else {
1065
0
            FindOptions::default()
1066
        };
1067
1068
0
        let cursor = self
1069
0
            .scheduler_collection
1070
0
            .find(filter)
1071
0
            .with_options(find_options)
1072
0
            .await
1073
0
            .map_err(|e| {
1074
0
                make_err!(
1075
0
                    Code::Internal,
1076
                    "Failed to create cursor in search_by_index_prefix: {e}"
1077
                )
1078
0
            })?;
1079
1080
0
        Ok(cursor.map(move |result| {
1081
0
            let doc = result.map_err(|e| {
1082
0
                make_err!(
1083
0
                    Code::Internal,
1084
                    "Error reading document in search_by_index_prefix: {e}"
1085
                )
1086
0
            })?;
1087
1088
0
            let data = match doc.get(DATA_FIELD) {
1089
0
                Some(Bson::Binary(binary)) => Bytes::from(binary.bytes.clone()),
1090
                _ => {
1091
0
                    return Err(make_err!(
1092
0
                        Code::Internal,
1093
0
                        "Missing or invalid data field in search_by_index_prefix"
1094
0
                    ));
1095
                }
1096
            };
1097
1098
0
            let version = if <K as SchedulerIndexProvider>::Versioned::VALUE {
1099
0
                doc.get_i64(VERSION_FIELD).unwrap_or(0)
1100
            } else {
1101
0
                0
1102
            };
1103
1104
0
            K::decode(version, data).map_err(|e| {
1105
0
                make_err!(
1106
0
                    Code::Internal,
1107
                    "Failed to decode in search_by_index_prefix: {e}"
1108
                )
1109
0
            })
1110
0
        }))
1111
0
    }
1112
1113
0
    async fn get_and_decode<K>(
1114
0
        &self,
1115
0
        key: K,
1116
0
    ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>
1117
0
    where
1118
0
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send,
1119
0
    {
1120
0
        let key = key.get_key();
1121
0
        let encoded_key = self.encode_key(&key);
1122
0
        let filter = doc! { KEY_FIELD: encoded_key.as_ref() };
1123
1124
0
        let doc = self
1125
0
            .scheduler_collection
1126
0
            .find_one(filter)
1127
0
            .await
1128
0
            .map_err(|e| {
1129
0
                make_err!(
1130
0
                    Code::Internal,
1131
                    "Failed to find document in get_and_decode: {e}"
1132
                )
1133
0
            })?;
1134
1135
0
        let Some(doc) = doc else {
1136
0
            return Ok(None);
1137
        };
1138
1139
0
        let data = match doc.get(DATA_FIELD) {
1140
0
            Some(Bson::Binary(binary)) => Bytes::from(binary.bytes.clone()),
1141
0
            _ => return Ok(None),
1142
        };
1143
1144
0
        let version = if <K as SchedulerStoreKeyProvider>::Versioned::VALUE {
1145
0
            doc.get_i64(VERSION_FIELD).unwrap_or(0)
1146
        } else {
1147
0
            0
1148
        };
1149
1150
0
        Ok(Some(K::decode(version, data).map_err(|e| {
1151
0
            make_err!(Code::Internal, "Failed to decode in get_and_decode: {e}")
1152
0
        })?))
1153
0
    }
1154
}