Coverage Report

Created: 2026-02-23 10:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/mongo_store.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::ops::Bound;
17
use core::pin::Pin;
18
use core::time::Duration;
19
use std::borrow::Cow;
20
use std::sync::{Arc, Weak};
21
22
use async_trait::async_trait;
23
use bytes::Bytes;
24
use futures::stream::{Stream, StreamExt, TryStreamExt};
25
use mongodb::bson::{Bson, Document, doc};
26
use mongodb::options::{ClientOptions, FindOptions, IndexOptions, ReturnDocument, WriteConcern};
27
use mongodb::{Client as MongoClient, Collection, Database, IndexModel};
28
use nativelink_config::stores::ExperimentalMongoSpec;
29
use nativelink_error::{Code, Error, make_err, make_input_err};
30
use nativelink_metric::MetricsComponent;
31
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
32
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
33
use nativelink_util::spawn;
34
use nativelink_util::store_trait::{
35
    BoolValue, RemoveItemCallback, SchedulerCurrentVersionProvider, SchedulerIndexProvider,
36
    SchedulerStore, SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,
37
    SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo,
38
};
39
use nativelink_util::task::JoinHandleDropGuard;
40
use parking_lot::{Mutex, RwLock};
41
use patricia_tree::StringPatriciaMap;
42
use tokio::sync::watch;
43
use tokio::time::sleep;
44
use tracing::{error, info, warn};
45
46
use crate::cas_utils::is_zero_digest;
47
48
/// The default database name if not specified.
49
const DEFAULT_DB_NAME: &str = "nativelink";
50
51
/// The default collection name if not specified.
52
const DEFAULT_COLLECTION_NAME: &str = "cas";
53
54
/// The default scheduler collection name if not specified.
55
const DEFAULT_SCHEDULER_COLLECTION_NAME: &str = "scheduler";
56
57
/// The default size of the read chunk when reading data from `MongoDB`.
58
const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024;
59
60
/// The default connection timeout in milliseconds if not specified.
61
const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000;
62
63
/// The default command timeout in milliseconds if not specified.
64
const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000;
65
66
/// The default maximum number of concurrent uploads.
67
const DEFAULT_MAX_CONCURRENT_UPLOADS: usize = 10;
68
69
/// The name of the field in `MongoDB` documents that stores the key.
70
const KEY_FIELD: &str = "_id";
71
72
/// The name of the field in `MongoDB` documents that stores the data.
73
const DATA_FIELD: &str = "data";
74
75
/// The name of the field in `MongoDB` documents that stores the version.
76
const VERSION_FIELD: &str = "version";
77
78
/// The name of the field in `MongoDB` documents that stores the size.
79
const SIZE_FIELD: &str = "size";
80
81
/// A [`StoreDriver`] implementation that uses `MongoDB` as a backing store.
82
#[derive(Debug, MetricsComponent)]
83
pub struct ExperimentalMongoStore {
84
    /// The `MongoDB` client.
85
    #[allow(dead_code)]
86
    client: MongoClient,
87
88
    /// The database to use.
89
    database: Database,
90
91
    /// The collection for CAS data.
92
    cas_collection: Collection<Document>,
93
94
    /// The collection for scheduler data.
95
    scheduler_collection: Collection<Document>,
96
97
    /// A common prefix to append to all keys before they are sent to `MongoDB`.
98
    #[metric(help = "Prefix to append to all keys before sending to MongoDB")]
99
    key_prefix: String,
100
101
    /// The amount of data to read from `MongoDB` at a time.
102
    #[metric(help = "The amount of data to read from MongoDB at a time")]
103
    read_chunk_size: usize,
104
105
    /// The maximum number of concurrent uploads.
106
    #[metric(help = "The maximum number of concurrent uploads")]
107
    max_concurrent_uploads: usize,
108
109
    /// Enable change streams for real-time updates.
110
    #[metric(help = "Whether change streams are enabled")]
111
    enable_change_streams: bool,
112
113
    /// A manager for subscriptions to keys in `MongoDB`.
114
    subscription_manager: Mutex<Option<Arc<ExperimentalMongoSubscriptionManager>>>,
115
}
116
117
impl ExperimentalMongoStore {
118
    /// Create a new `ExperimentalMongoStore` from the given configuration.
119
3
    pub async fn new(mut spec: ExperimentalMongoSpec) -> Result<Arc<Self>, Error> {
120
        // Set defaults
121
3
        if spec.connection_string.is_empty() {
  Branch (121:12): [True: 0, False: 0]
  Branch (121:12): [True: 1, False: 2]
  Branch (121:12): [Folded - Ignored]
122
1
            return Err(make_err!(
123
1
                Code::InvalidArgument,
124
1
                "No connection string was specified in mongo store configuration."
125
1
            ));
126
2
        }
127
128
2
        if spec.database.is_empty() {
  Branch (128:12): [True: 0, False: 0]
  Branch (128:12): [True: 0, False: 2]
  Branch (128:12): [Folded - Ignored]
129
0
            spec.database = DEFAULT_DB_NAME.to_string();
130
2
        }
131
132
2
        if spec.cas_collection.is_empty() {
  Branch (132:12): [True: 0, False: 0]
  Branch (132:12): [True: 0, False: 2]
  Branch (132:12): [Folded - Ignored]
133
0
            spec.cas_collection = DEFAULT_COLLECTION_NAME.to_string();
134
2
        }
135
136
2
        if spec.scheduler_collection.is_empty() {
  Branch (136:12): [True: 0, False: 0]
  Branch (136:12): [True: 0, False: 2]
  Branch (136:12): [Folded - Ignored]
137
0
            spec.scheduler_collection = DEFAULT_SCHEDULER_COLLECTION_NAME.to_string();
138
2
        }
139
140
2
        if spec.read_chunk_size == 0 {
  Branch (140:12): [True: 0, False: 0]
  Branch (140:12): [True: 0, False: 2]
  Branch (140:12): [Folded - Ignored]
141
0
            spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
142
2
        }
143
144
2
        if spec.connection_timeout_ms == 0 {
  Branch (144:12): [True: 0, False: 0]
  Branch (144:12): [True: 0, False: 2]
  Branch (144:12): [Folded - Ignored]
145
0
            spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS;
146
2
        }
147
148
2
        if spec.command_timeout_ms == 0 {
  Branch (148:12): [True: 0, False: 0]
  Branch (148:12): [True: 0, False: 2]
  Branch (148:12): [Folded - Ignored]
149
0
            spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS;
150
2
        }
151
152
2
        if spec.max_concurrent_uploads == 0 {
  Branch (152:12): [True: 0, False: 0]
  Branch (152:12): [True: 0, False: 2]
  Branch (152:12): [Folded - Ignored]
153
0
            spec.max_concurrent_uploads = DEFAULT_MAX_CONCURRENT_UPLOADS;
154
2
        }
155
156
        // Configure client options
157
2
        let 
mut client_options1
= ClientOptions::parse(&spec.connection_string)
158
2
            .await
159
2
            .map_err(|e| 
{1
160
1
                make_err!(
161
1
                    Code::InvalidArgument,
162
                    "Failed to parse MongoDB connection string: {e}"
163
                )
164
1
            })?;
165
166
1
        client_options.server_selection_timeout =
167
1
            Some(Duration::from_millis(spec.connection_timeout_ms));
168
1
        client_options.connect_timeout = Some(Duration::from_millis(spec.connection_timeout_ms));
169
170
        // Set write concern if specified
171
1
        if let Some(
w0
) = spec.write_concern_w {
  Branch (171:16): [True: 0, False: 0]
  Branch (171:16): [True: 0, False: 1]
  Branch (171:16): [Folded - Ignored]
172
            // Parse write concern w - can be a number or string like "majority"
173
0
            let w_value = if let Ok(num) = w.parse::<u32>() {
  Branch (173:34): [True: 0, False: 0]
  Branch (173:34): [True: 0, False: 0]
  Branch (173:34): [Folded - Ignored]
174
0
                Some(num.into())
175
            } else {
176
0
                Some(w.into())
177
            };
178
179
            // Build write concern based on which options are set
180
0
            let write_concern = match (w_value, spec.write_concern_j, spec.write_concern_timeout_ms)
181
            {
182
0
                (Some(w), Some(j), Some(timeout)) => WriteConcern::builder()
183
0
                    .w(Some(w))
184
0
                    .journal(j)
185
0
                    .w_timeout(Some(Duration::from_millis(u64::from(timeout))))
186
0
                    .build(),
187
0
                (Some(w), Some(j), None) => WriteConcern::builder().w(Some(w)).journal(j).build(),
188
0
                (Some(w), None, Some(timeout)) => WriteConcern::builder()
189
0
                    .w(Some(w))
190
0
                    .w_timeout(Some(Duration::from_millis(u64::from(timeout))))
191
0
                    .build(),
192
0
                (Some(w), None, None) => WriteConcern::builder().w(Some(w)).build(),
193
0
                _ => unreachable!(), // We know w is Some because we're in the if let Some(w) block
194
            };
195
196
0
            client_options.write_concern = Some(write_concern);
197
1
        } else if spec.write_concern_j.is_some() || 
spec.write_concern_timeout_ms0
.
is_some0
() {
  Branch (197:19): [True: 0, False: 0]
  Branch (197:53): [True: 0, False: 0]
  Branch (197:19): [True: 1, False: 0]
  Branch (197:53): [True: 0, False: 0]
  Branch (197:19): [Folded - Ignored]
  Branch (197:53): [Folded - Ignored]
198
1
            return Err(make_err!(
199
1
                Code::InvalidArgument,
200
1
                "write_concern_w not set, but j and/or timeout set. Please set 'write_concern_w' to a non-default value. See https://www.mongodb.com/docs/manual/reference/write-concern/#w-option for options."
201
1
            ));
202
0
        }
203
204
        // Create client
205
0
        let client = MongoClient::with_options(client_options).map_err(|e| {
206
0
            make_err!(
207
0
                Code::InvalidArgument,
208
                "Failed to create MongoDB client: {e}"
209
            )
210
0
        })?;
211
212
        // Get database and collections
213
0
        let database = client.database(&spec.database);
214
0
        let cas_collection = database.collection::<Document>(&spec.cas_collection);
215
0
        let scheduler_collection = database.collection::<Document>(&spec.scheduler_collection);
216
217
        // Create indexes
218
0
        Self::create_indexes(&cas_collection, &scheduler_collection).await?;
219
220
0
        let store = Self {
221
0
            client,
222
0
            database,
223
0
            cas_collection,
224
0
            scheduler_collection,
225
0
            key_prefix: spec.key_prefix.clone().unwrap_or_default(),
226
0
            read_chunk_size: spec.read_chunk_size,
227
0
            max_concurrent_uploads: spec.max_concurrent_uploads,
228
0
            enable_change_streams: spec.enable_change_streams,
229
0
            subscription_manager: Mutex::new(None),
230
0
        };
231
232
0
        Ok(Arc::new(store))
233
3
    }
234
235
    /// Create necessary indexes for efficient operations.
236
0
    async fn create_indexes(
237
0
        cas_collection: &Collection<Document>,
238
0
        _scheduler_collection: &Collection<Document>,
239
0
    ) -> Result<(), Error> {
240
        // CAS collection indexes
241
0
        cas_collection
242
0
            .create_index(IndexModel::builder().keys(doc! { SIZE_FIELD: 1 }).build())
243
0
            .await
244
0
            .map_err(|e| {
245
0
                make_err!(
246
0
                    Code::Internal,
247
                    "Failed to create size index on CAS collection: {e}"
248
                )
249
0
            })?;
250
251
        // Scheduler collection will have dynamic indexes created as needed
252
253
0
        Ok(())
254
0
    }
255
256
    /// Encode a [`StoreKey`] so it can be sent to `MongoDB`.
257
0
    fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> {
258
0
        let key_body = key.as_str();
259
0
        if self.key_prefix.is_empty() {
  Branch (259:12): [True: 0, False: 0]
  Branch (259:12): [Folded - Ignored]
260
0
            key_body
261
        } else {
262
0
            match key_body {
263
0
                Cow::Owned(mut encoded_key) => {
264
0
                    encoded_key.insert_str(0, &self.key_prefix);
265
0
                    Cow::Owned(encoded_key)
266
                }
267
0
                Cow::Borrowed(body) => {
268
0
                    let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len());
269
0
                    encoded_key.push_str(&self.key_prefix);
270
0
                    encoded_key.push_str(body);
271
0
                    Cow::Owned(encoded_key)
272
                }
273
            }
274
        }
275
0
    }
276
277
    /// Decode a key from `MongoDB` by removing the prefix.
278
0
    fn decode_key(&self, key: &str) -> Option<String> {
279
0
        if self.key_prefix.is_empty() {
  Branch (279:12): [True: 0, False: 0]
  Branch (279:12): [Folded - Ignored]
280
0
            Some(key.to_string())
281
        } else {
282
0
            key.strip_prefix(&self.key_prefix).map(ToString::to_string)
283
        }
284
0
    }
285
}
286
287
#[async_trait]
288
impl StoreDriver for ExperimentalMongoStore {
289
    async fn has_with_results(
290
        self: Pin<&Self>,
291
        keys: &[StoreKey<'_>],
292
        results: &mut [Option<u64>],
293
0
    ) -> Result<(), Error> {
294
        for (key, result) in keys.iter().zip(results.iter_mut()) {
295
            // Handle zero digest specially
296
            if is_zero_digest(key.borrow()) {
297
                *result = Some(0);
298
                continue;
299
            }
300
301
            let encoded_key = self.encode_key(key);
302
            let filter = doc! { KEY_FIELD: encoded_key.as_ref() };
303
304
            match self.cas_collection.find_one(filter).await {
305
                Ok(Some(doc)) => {
306
0
                    *result = doc.get_i64(SIZE_FIELD).ok().map(|v| v as u64);
307
                }
308
                Ok(None) => {
309
                    *result = None;
310
                }
311
                Err(e) => {
312
                    return Err(make_err!(
313
                        Code::Internal,
314
                        "MongoDB error in has_with_results: {e}"
315
                    ));
316
                }
317
            }
318
        }
319
        Ok(())
320
0
    }
321
322
    async fn list(
323
        self: Pin<&Self>,
324
        range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
325
        handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
326
0
    ) -> Result<u64, Error> {
327
        let mut filter = Document::new();
328
329
        // Build range query
330
        let mut key_filter = Document::new();
331
        match &range.0 {
332
            Bound::Included(start) => {
333
                let encoded = self.encode_key(start);
334
                key_filter.insert("$gte", encoded.as_ref());
335
            }
336
            Bound::Excluded(start) => {
337
                let encoded = self.encode_key(start);
338
                key_filter.insert("$gt", encoded.as_ref());
339
            }
340
            Bound::Unbounded => {}
341
        }
342
343
        match &range.1 {
344
            Bound::Included(end) => {
345
                let encoded = self.encode_key(end);
346
                key_filter.insert("$lte", encoded.as_ref());
347
            }
348
            Bound::Excluded(end) => {
349
                let encoded = self.encode_key(end);
350
                key_filter.insert("$lt", encoded.as_ref());
351
            }
352
            Bound::Unbounded => {}
353
        }
354
355
        if !key_filter.is_empty() {
356
            filter.insert(KEY_FIELD, key_filter);
357
        }
358
359
        // Add prefix filter if needed
360
        if !self.key_prefix.is_empty() {
361
            let regex_filter = doc! {
362
                KEY_FIELD: {
363
                    "$regex": format!("^{}", regex::escape(&self.key_prefix)),
364
                }
365
            };
366
            if filter.is_empty() {
367
                filter = regex_filter;
368
            } else {
369
                filter = doc! { "$and": [filter, regex_filter] };
370
            }
371
        }
372
373
        let mut cursor = self
374
            .cas_collection
375
            .find(filter)
376
            .projection(doc! { KEY_FIELD: 1 })
377
            .await
378
0
            .map_err(|e| make_err!(Code::Internal, "Failed to create cursor in list: {e}"))?;
379
380
        let mut count = 0u64;
381
        while let Some(doc) = cursor
382
            .try_next()
383
            .await
384
0
            .map_err(|e| make_err!(Code::Internal, "Failed to get next document in list: {e}"))?
385
        {
386
            if let Ok(key) = doc.get_str(KEY_FIELD) {
387
                if let Some(decoded_key) = self.decode_key(key) {
388
                    let store_key = StoreKey::new_str(&decoded_key);
389
                    count += 1;
390
                    if !handler(&store_key) {
391
                        break;
392
                    }
393
                }
394
            }
395
        }
396
397
        Ok(count)
398
0
    }
399
400
    async fn update(
401
        self: Pin<&Self>,
402
        key: StoreKey<'_>,
403
        mut reader: DropCloserReadHalf,
404
        upload_size: UploadSizeInfo,
405
0
    ) -> Result<(), Error> {
406
        let encoded_key = self.encode_key(&key);
407
408
        // Handle zero digest
409
        if is_zero_digest(key.borrow()) {
410
0
            let chunk = reader.peek().await.map_err(|e| {
411
0
                make_err!(
412
0
                    Code::Internal,
413
                    "Failed to peek in ExperimentalMongoStore::update: {e}"
414
                )
415
0
            })?;
416
            if chunk.is_empty() {
417
0
                reader.drain().await.map_err(|e| {
418
0
                    make_err!(
419
0
                        Code::Internal,
420
                        "Failed to drain in ExperimentalMongoStore::update: {e}"
421
                    )
422
0
                })?;
423
                return Ok(());
424
            }
425
        }
426
427
        // Special handling for MaxSize(0) - this should error if stream is closed
428
        if upload_size == UploadSizeInfo::MaxSize(0) {
429
            // Try to read from the stream - if it's closed immediately, this should error
430
            match reader.recv().await {
431
                Ok(_chunk) => {
432
                    return Err(make_input_err!(
433
                        "Received data when MaxSize is 0 in MongoDB store"
434
                    ));
435
                }
436
                Err(e) => {
437
                    return Err(make_err!(
438
                        Code::InvalidArgument,
439
                        "Stream closed for MaxSize(0) upload: {e}"
440
                    ));
441
                }
442
            }
443
        }
444
445
        // Read all data into memory with proper EOF handling
446
        let mut data = Vec::new();
447
        while let Ok(chunk) = reader.recv().await {
448
            if chunk.is_empty() {
449
                break; // Empty chunk signals EOF
450
            }
451
            data.extend_from_slice(&chunk);
452
        }
453
454
        let size = data.len() as i64;
455
456
        // Create document
457
        let doc = doc! {
458
            KEY_FIELD: encoded_key.as_ref(),
459
            DATA_FIELD: Bson::Binary(mongodb::bson::Binary {
460
                subtype: mongodb::bson::spec::BinarySubtype::Generic,
461
                bytes: data,
462
            }),
463
            SIZE_FIELD: size,
464
        };
465
466
        // Upsert the document
467
        self.cas_collection
468
            .update_one(
469
                doc! { KEY_FIELD: encoded_key.as_ref() },
470
                doc! { "$set": doc },
471
            )
472
            .upsert(true)
473
            .await
474
0
            .map_err(|e| make_err!(Code::Internal, "Failed to update document in MongoDB: {e}"))?;
475
476
        Ok(())
477
0
    }
478
479
    async fn get_part(
480
        self: Pin<&Self>,
481
        key: StoreKey<'_>,
482
        writer: &mut DropCloserWriteHalf,
483
        offset: u64,
484
        length: Option<u64>,
485
0
    ) -> Result<(), Error> {
486
        // Handle zero digest
487
        if is_zero_digest(key.borrow()) {
488
0
            return writer.send_eof().map_err(|e| {
489
0
                make_err!(
490
0
                    Code::Internal,
491
                    "Failed to send zero EOF in mongo store get_part: {e}"
492
                )
493
0
            });
494
        }
495
496
        let encoded_key = self.encode_key(&key);
497
        let filter = doc! { KEY_FIELD: encoded_key.as_ref() };
498
499
        let doc = self
500
            .cas_collection
501
            .find_one(filter)
502
            .await
503
0
            .map_err(|e| make_err!(Code::Internal, "Failed to find document in get_part: {e}"))?
504
0
            .ok_or_else(|| {
505
0
                make_err!(
506
0
                    Code::NotFound,
507
                    "Data not found in MongoDB store for digest: {key:?}"
508
                )
509
0
            })?;
510
511
        let data = match doc.get(DATA_FIELD) {
512
            Some(Bson::Binary(binary)) => &binary.bytes,
513
            _ => {
514
                return Err(make_err!(
515
                    Code::Internal,
516
                    "Invalid data field in MongoDB document"
517
                ));
518
            }
519
        };
520
521
        let offset = usize::try_from(offset).unwrap_or(usize::MAX);
522
        let data_len = data.len();
523
524
        if offset > data_len {
525
0
            return writer.send_eof().map_err(|e| {
526
0
                make_err!(
527
0
                    Code::Internal,
528
                    "Failed to send EOF in mongo store get_part: {e}"
529
                )
530
0
            });
531
        }
532
533
        let end = if let Some(len) = length {
534
            cmp::min(
535
                offset.saturating_add(usize::try_from(len).unwrap_or(usize::MAX)),
536
                data_len,
537
            )
538
        } else {
539
            data_len
540
        };
541
542
        if offset < end {
543
            let chunk = &data[offset..end];
544
545
            // Send data in chunks
546
            for chunk_data in chunk.chunks(self.read_chunk_size) {
547
0
                writer.send(chunk_data.to_vec().into()).await.map_err(|e| {
548
0
                    make_err!(
549
0
                        Code::Internal,
550
                        "Failed to write data in ExperimentalMongoStore::get_part: {e}"
551
                    )
552
0
                })?;
553
            }
554
        }
555
556
0
        writer.send_eof().map_err(|e| {
557
0
            make_err!(
558
0
                Code::Internal,
559
                "Failed to write EOF in mongo store get_part: {e}"
560
            )
561
0
        })
562
0
    }
563
564
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
565
0
        self
566
0
    }
567
568
0
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send) {
569
0
        self
570
0
    }
571
572
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send> {
573
0
        self
574
0
    }
575
576
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
577
0
        registry.register_indicator(self);
578
0
    }
579
580
0
    fn register_remove_callback(
581
0
        self: Arc<Self>,
582
0
        _callback: Arc<dyn RemoveItemCallback>,
583
0
    ) -> Result<(), Error> {
584
        // drop because we don't remove anything from Mongo
585
0
        Ok(())
586
0
    }
587
}
588
589
#[async_trait]
590
impl HealthStatusIndicator for ExperimentalMongoStore {
591
0
    fn get_name(&self) -> &'static str {
592
0
        "ExperimentalMongoStore"
593
0
    }
594
595
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
596
        match self.database.run_command(doc! { "ping": 1 }).await {
597
            Ok(_) => HealthStatus::new_ok(self, "Connection healthy".into()),
598
            Err(e) => HealthStatus::new_failed(
599
                self,
600
                format!("{namespace} - MongoDB connection error: {e}").into(),
601
            ),
602
        }
603
0
    }
604
}
605
606
// -------------------------------------------------------------------
607
//
608
// `ExperimentalMongoDB` scheduler implementation. Likely to change
609
// -------------------------------------------------------------------
610
611
/// An individual subscription to a key in `ExperimentalMongoDB`.
612
#[derive(Debug)]
613
pub struct ExperimentalMongoSubscription {
614
    receiver: Option<watch::Receiver<String>>,
615
    weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<ExperimentalMongoSubscriptionPublisher>>>,
616
}
617
618
impl SchedulerSubscription for ExperimentalMongoSubscription {
619
0
    async fn changed(&mut self) -> Result<(), Error> {
620
0
        let receiver = self.receiver.as_mut().ok_or_else(|| {
621
0
            make_err!(
622
0
                Code::Internal,
623
                "In ExperimentalMongoSubscription::changed::as_mut"
624
            )
625
0
        })?;
626
0
        receiver.changed().await.map_err(|_| {
627
0
            make_err!(
628
0
                Code::Internal,
629
                "In ExperimentalMongoSubscription::changed::changed"
630
            )
631
0
        })
632
0
    }
633
}
634
635
impl Drop for ExperimentalMongoSubscription {
636
0
    fn drop(&mut self) {
637
0
        let Some(receiver) = self.receiver.take() else {
  Branch (637:13): [True: 0, False: 0]
  Branch (637:13): [Folded - Ignored]
638
0
            warn!("ExperimentalMongoSubscription has already been dropped, nothing to do.");
639
0
            return;
640
        };
641
0
        let key = receiver.borrow().clone();
642
0
        drop(receiver);
643
644
0
        let Some(subscribed_keys) = self.weak_subscribed_keys.upgrade() else {
  Branch (644:13): [True: 0, False: 0]
  Branch (644:13): [Folded - Ignored]
645
0
            return;
646
        };
647
0
        let mut subscribed_keys = subscribed_keys.write();
648
0
        let Some(value) = subscribed_keys.get(&key) else {
  Branch (648:13): [True: 0, False: 0]
  Branch (648:13): [Folded - Ignored]
649
0
            error!(
650
0
                "Key {key} was not found in subscribed keys when checking if it should be removed."
651
            );
652
0
            return;
653
        };
654
655
0
        if value.receiver_count() == 0 {
  Branch (655:12): [True: 0, False: 0]
  Branch (655:12): [Folded - Ignored]
656
0
            subscribed_keys.remove(key);
657
0
        }
658
0
    }
659
}
660
661
/// A publisher for a key in `ExperimentalMongoDB`.
662
#[derive(Debug)]
663
struct ExperimentalMongoSubscriptionPublisher {
664
    sender: Mutex<watch::Sender<String>>,
665
}
666
667
impl ExperimentalMongoSubscriptionPublisher {
668
0
    fn new(
669
0
        key: String,
670
0
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
671
0
    ) -> (Self, ExperimentalMongoSubscription) {
672
0
        let (sender, receiver) = watch::channel(key);
673
0
        let publisher = Self {
674
0
            sender: Mutex::new(sender),
675
0
        };
676
0
        let subscription = ExperimentalMongoSubscription {
677
0
            receiver: Some(receiver),
678
0
            weak_subscribed_keys,
679
0
        };
680
0
        (publisher, subscription)
681
0
    }
682
683
0
    fn subscribe(
684
0
        &self,
685
0
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
686
0
    ) -> ExperimentalMongoSubscription {
687
0
        let receiver = self.sender.lock().subscribe();
688
0
        ExperimentalMongoSubscription {
689
0
            receiver: Some(receiver),
690
0
            weak_subscribed_keys,
691
0
        }
692
0
    }
693
694
0
    fn receiver_count(&self) -> usize {
695
0
        self.sender.lock().receiver_count()
696
0
    }
697
698
0
    fn notify(&self) {
699
0
        self.sender.lock().send_modify(|_| {});
700
0
    }
701
}
702
703
#[derive(Debug)]
704
pub struct ExperimentalMongoSubscriptionManager {
705
    subscribed_keys: Arc<RwLock<StringPatriciaMap<ExperimentalMongoSubscriptionPublisher>>>,
706
    _subscription_spawn: JoinHandleDropGuard<()>,
707
}
708
709
impl ExperimentalMongoSubscriptionManager {
710
0
    pub fn new(database: Database, collection_name: String, key_prefix: String) -> Self {
711
0
        let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new()));
712
0
        let subscribed_keys_weak = Arc::downgrade(&subscribed_keys);
713
714
        Self {
715
0
            subscribed_keys,
716
0
            _subscription_spawn: spawn!("mongo_subscribe_spawn", async move {
717
0
                let collection = database.collection::<Document>(&collection_name);
718
719
                loop {
720
                    // Try to create change stream
721
0
                    let change_stream_result = collection.watch().await;
722
723
0
                    match change_stream_result {
724
0
                        Ok(mut change_stream) => {
725
0
                            info!("MongoDB change stream connected");
726
727
0
                            while let Some(event_result) = change_stream.next().await {
  Branch (727:39): [True: 0, False: 0]
  Branch (727:39): [Folded - Ignored]
728
0
                                match event_result {
729
0
                                    Ok(event) => {
730
                                        use mongodb::change_stream::event::OperationType;
731
0
                                        match event.operation_type {
732
                                            OperationType::Insert
733
                                            | OperationType::Update
734
                                            | OperationType::Replace
735
                                            | OperationType::Delete => {
736
0
                                                if let Some(doc_key) = event.document_key {
  Branch (736:56): [True: 0, False: 0]
  Branch (736:56): [Folded - Ignored]
737
0
                                                    if let Ok(key) = doc_key.get_str(KEY_FIELD) {
  Branch (737:60): [True: 0, False: 0]
  Branch (737:60): [Folded - Ignored]
738
                                                        // Remove prefix if present
739
0
                                                        let key = if key_prefix.is_empty() {
  Branch (739:70): [True: 0, False: 0]
  Branch (739:70): [Folded - Ignored]
740
0
                                                            key
741
                                                        } else {
742
0
                                                            key.strip_prefix(&key_prefix)
743
0
                                                                .unwrap_or(key)
744
                                                        };
745
746
0
                                                        let Some(subscribed_keys) =
  Branch (746:61): [True: 0, False: 0]
  Branch (746:61): [Folded - Ignored]
747
0
                                                            subscribed_keys_weak.upgrade()
748
                                                        else {
749
0
                                                            warn!(
750
0
                                                                "Parent dropped, exiting ExperimentalMongoSubscriptionManager"
751
                                                            );
752
0
                                                            return;
753
                                                        };
754
755
0
                                                        let subscribed_keys_mux =
756
0
                                                            subscribed_keys.read();
757
0
                                                        subscribed_keys_mux
758
0
                                                                .common_prefix_values(key)
759
0
                                                                .for_each(ExperimentalMongoSubscriptionPublisher::notify);
760
0
                                                    }
761
0
                                                }
762
                                            }
763
0
                                            _ => {}
764
                                        }
765
                                    }
766
0
                                    Err(e) => {
767
0
                                        error!("Error in change stream: {e}");
768
0
                                        break;
769
                                    }
770
                                }
771
                            }
772
                        }
773
0
                        Err(e) => {
774
0
                            warn!("Failed to create change stream: {e}. Will retry in 5 seconds.");
775
                        }
776
                    }
777
778
                    // Check if parent is still alive
779
0
                    if subscribed_keys_weak.upgrade().is_none() {
  Branch (779:24): [True: 0, False: 0]
  Branch (779:24): [Folded - Ignored]
780
0
                        warn!("Parent dropped, exiting ExperimentalMongoSubscriptionManager");
781
0
                        return;
782
0
                    }
783
784
                    // Sleep before retry
785
0
                    sleep(Duration::from_secs(5)).await;
786
787
                    // Notify all subscribers on reconnection
788
0
                    if let Some(subscribed_keys) = subscribed_keys_weak.upgrade() {
  Branch (788:28): [True: 0, False: 0]
  Branch (788:28): [Folded - Ignored]
789
0
                        let subscribed_keys_mux = subscribed_keys.read();
790
0
                        for publisher in subscribed_keys_mux.values() {
791
0
                            publisher.notify();
792
0
                        }
793
0
                    }
794
                }
795
0
            }),
796
        }
797
0
    }
798
}
799
800
impl SchedulerSubscriptionManager for ExperimentalMongoSubscriptionManager {
801
    type Subscription = ExperimentalMongoSubscription;
802
803
0
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
804
0
    where
805
0
        K: SchedulerStoreKeyProvider,
806
    {
807
0
        let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys);
808
0
        let mut subscribed_keys = self.subscribed_keys.write();
809
0
        let key = key.get_key();
810
0
        let key_str = key.as_str();
811
812
0
        let mut subscription = if let Some(publisher) = subscribed_keys.get(&key_str) {
  Branch (812:39): [Folded - Ignored]
  Branch (812:39): [Folded - Ignored]
813
0
            publisher.subscribe(weak_subscribed_keys)
814
        } else {
815
0
            let (publisher, subscription) = ExperimentalMongoSubscriptionPublisher::new(
816
0
                key_str.to_string(),
817
0
                weak_subscribed_keys,
818
0
            );
819
0
            subscribed_keys.insert(key_str, publisher);
820
0
            subscription
821
        };
822
823
0
        subscription
824
0
            .receiver
825
0
            .as_mut()
826
0
            .ok_or_else(|| {
827
0
                make_err!(
828
0
                    Code::Internal,
829
                    "Receiver should be set in ExperimentalMongoSubscriptionManager::subscribe"
830
                )
831
0
            })?
832
0
            .mark_changed();
833
834
0
        Ok(subscription)
835
0
    }
836
837
0
    fn is_reliable() -> bool {
838
0
        true
839
0
    }
840
}
841
842
impl SchedulerStore for ExperimentalMongoStore {
843
    type SubscriptionManager = ExperimentalMongoSubscriptionManager;
844
845
0
    fn subscription_manager(&self) -> Result<Arc<ExperimentalMongoSubscriptionManager>, Error> {
846
0
        let mut subscription_manager = self.subscription_manager.lock();
847
0
        if let Some(subscription_manager) = &*subscription_manager {
  Branch (847:16): [True: 0, False: 0]
  Branch (847:16): [Folded - Ignored]
848
0
            Ok(subscription_manager.clone())
849
        } else {
850
0
            if !self.enable_change_streams {
  Branch (850:16): [True: 0, False: 0]
  Branch (850:16): [Folded - Ignored]
851
0
                return Err(make_input_err!(
852
0
                    "ExperimentalMongoStore must have change streams enabled for scheduler subscriptions"
853
0
                ));
854
0
            }
855
856
0
            let sub = Arc::new(ExperimentalMongoSubscriptionManager::new(
857
0
                self.database.clone(),
858
0
                self.scheduler_collection.name().to_string(),
859
0
                self.key_prefix.clone(),
860
            ));
861
0
            *subscription_manager = Some(sub.clone());
862
0
            Ok(sub)
863
        }
864
0
    }
865
866
0
    async fn update_data<T>(&self, data: T) -> Result<Option<i64>, Error>
867
0
    where
868
0
        T: SchedulerStoreDataProvider
869
0
            + SchedulerStoreKeyProvider
870
0
            + SchedulerCurrentVersionProvider
871
0
            + Send,
872
0
    {
873
0
        let key = data.get_key();
874
0
        let encoded_key = self.encode_key(&key);
875
0
        let maybe_index = data.get_indexes().map_err(|e| {
876
0
            make_err!(
877
0
                Code::Internal,
878
                "Error getting indexes in ExperimentalMongoStore::update_data: {e}"
879
            )
880
0
        })?;
881
882
0
        if <T as SchedulerStoreKeyProvider>::Versioned::VALUE {
  Branch (882:12): [Folded - Ignored]
  Branch (882:12): [True: 0, False: 0]
  Branch (882:12): [Folded - Ignored]
883
0
            let current_version = data.current_version();
884
0
            let data_bytes = data.try_into_bytes().map_err(|e| {
885
0
                make_err!(
886
0
                    Code::Internal,
887
                    "Could not convert value to bytes in ExperimentalMongoStore::update_data: {e}"
888
                )
889
0
            })?;
890
891
0
            let mut update_doc = doc! {
892
                "$set": {
893
0
                    DATA_FIELD: Bson::Binary(mongodb::bson::Binary {
894
0
                        subtype: mongodb::bson::spec::BinarySubtype::Generic,
895
0
                        bytes: data_bytes.to_vec(),
896
0
                    }),
897
                },
898
                "$inc": {
899
                    VERSION_FIELD: 1i64,
900
                }
901
            };
902
903
            // Add indexes
904
0
            for (name, value) in maybe_index {
905
0
                update_doc.get_document_mut("$set").unwrap().insert(
906
0
                    name,
907
0
                    Bson::Binary(mongodb::bson::Binary {
908
0
                        subtype: mongodb::bson::spec::BinarySubtype::Generic,
909
0
                        bytes: value.to_vec(),
910
0
                    }),
911
0
                );
912
0
            }
913
914
0
            let filter = doc! {
915
0
                KEY_FIELD: encoded_key.as_ref(),
916
0
                VERSION_FIELD: current_version,
917
            };
918
919
0
            match self
920
0
                .scheduler_collection
921
0
                .find_one_and_update(filter, update_doc)
922
0
                .upsert(true)
923
0
                .return_document(ReturnDocument::After)
924
0
                .await
925
            {
926
0
                Ok(Some(doc)) => Ok(doc.get_i64(VERSION_FIELD).ok().or(Some(1i64))),
927
0
                Ok(None) => Ok(None),
928
0
                Err(e) => Err(make_err!(
929
0
                    Code::Internal,
930
0
                    "MongoDB error in update_data: {e}"
931
0
                )),
932
            }
933
        } else {
934
0
            let data_bytes = data.try_into_bytes().map_err(|e| {
935
0
                make_err!(
936
0
                    Code::Internal,
937
                    "Could not convert value to bytes in ExperimentalMongoStore::update_data: {e}"
938
                )
939
0
            })?;
940
941
0
            let mut doc = doc! {
942
0
                KEY_FIELD: encoded_key.as_ref(),
943
0
                DATA_FIELD: Bson::Binary(mongodb::bson::Binary {
944
0
                    subtype: mongodb::bson::spec::BinarySubtype::Generic,
945
0
                    bytes: data_bytes.to_vec(),
946
0
                }),
947
            };
948
949
            // Add indexes
950
0
            for (name, value) in maybe_index {
951
0
                doc.insert(
952
0
                    name,
953
0
                    Bson::Binary(mongodb::bson::Binary {
954
0
                        subtype: mongodb::bson::spec::BinarySubtype::Generic,
955
0
                        bytes: value.to_vec(),
956
0
                    }),
957
0
                );
958
0
            }
959
960
0
            self.scheduler_collection
961
0
                .update_one(
962
0
                    doc! { KEY_FIELD: encoded_key.as_ref() },
963
0
                    doc! { "$set": doc },
964
0
                )
965
0
                .upsert(true)
966
0
                .await
967
0
                .map_err(|e| {
968
0
                    make_err!(Code::Internal, "Failed to update scheduler document: {e}")
969
0
                })?;
970
971
0
            Ok(Some(0))
972
        }
973
0
    }
974
975
0
    async fn search_by_index_prefix<K>(
976
0
        &self,
977
0
        index: K,
978
0
    ) -> Result<
979
0
        impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
980
0
        Error,
981
0
    >
982
0
    where
983
0
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,
984
0
    {
985
0
        let index_value = index.index_value();
986
987
        // Create index if it doesn't exist
988
0
        let index_name = format!("{}_{}", K::KEY_PREFIX, K::INDEX_NAME);
989
0
        self.scheduler_collection
990
0
            .create_index(
991
0
                IndexModel::builder()
992
0
                    .keys(doc! { K::INDEX_NAME: 1 })
993
0
                    .options(IndexOptions::builder().name(index_name).build())
994
0
                    .build(),
995
0
            )
996
0
            .await
997
0
            .map_err(|e| make_err!(Code::Internal, "Failed to create scheduler index: {e}"))?;
998
999
        // Build filter
1000
0
        let filter = doc! {
1001
            K::INDEX_NAME: {
1002
0
                "$regex": format!("^{}", regex::escape(index_value.as_ref())),
1003
            }
1004
        };
1005
1006
        // Add sort if specified
1007
0
        let find_options = if let Some(sort_key) = K::MAYBE_SORT_KEY {
  Branch (1007:35): [Folded - Ignored]
  Branch (1007:35): [True: 0, False: 0]
  Branch (1007:35): [True: 0, False: 0]
  Branch (1007:35): [Folded - Ignored]
1008
0
            FindOptions::builder().sort(doc! { sort_key: 1 }).build()
1009
        } else {
1010
0
            FindOptions::default()
1011
        };
1012
1013
0
        let cursor = self
1014
0
            .scheduler_collection
1015
0
            .find(filter)
1016
0
            .with_options(find_options)
1017
0
            .await
1018
0
            .map_err(|e| {
1019
0
                make_err!(
1020
0
                    Code::Internal,
1021
                    "Failed to create cursor in search_by_index_prefix: {e}"
1022
                )
1023
0
            })?;
1024
1025
0
        Ok(cursor.map(move |result| {
1026
0
            let doc = result.map_err(|e| {
1027
0
                make_err!(
1028
0
                    Code::Internal,
1029
                    "Error reading document in search_by_index_prefix: {e}"
1030
                )
1031
0
            })?;
1032
1033
0
            let data = match doc.get(DATA_FIELD) {
1034
0
                Some(Bson::Binary(binary)) => Bytes::from(binary.bytes.clone()),
1035
                _ => {
1036
0
                    return Err(make_err!(
1037
0
                        Code::Internal,
1038
0
                        "Missing or invalid data field in search_by_index_prefix"
1039
0
                    ));
1040
                }
1041
            };
1042
1043
0
            let version = if <K as SchedulerIndexProvider>::Versioned::VALUE {
  Branch (1043:30): [Folded - Ignored]
  Branch (1043:30): [True: 0, False: 0]
  Branch (1043:30): [True: 0, False: 0]
  Branch (1043:30): [Folded - Ignored]
1044
0
                doc.get_i64(VERSION_FIELD).unwrap_or(0)
1045
            } else {
1046
0
                0
1047
            };
1048
1049
0
            K::decode(version, data).map_err(|e| {
1050
0
                make_err!(
1051
0
                    Code::Internal,
1052
                    "Failed to decode in search_by_index_prefix: {e}"
1053
                )
1054
0
            })
1055
0
        }))
1056
0
    }
1057
1058
0
    async fn get_and_decode<K>(
1059
0
        &self,
1060
0
        key: K,
1061
0
    ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>
1062
0
    where
1063
0
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send,
1064
0
    {
1065
0
        let key = key.get_key();
1066
0
        let encoded_key = self.encode_key(&key);
1067
0
        let filter = doc! { KEY_FIELD: encoded_key.as_ref() };
1068
1069
0
        let doc = self
1070
0
            .scheduler_collection
1071
0
            .find_one(filter)
1072
0
            .await
1073
0
            .map_err(|e| {
1074
0
                make_err!(
1075
0
                    Code::Internal,
1076
                    "Failed to find document in get_and_decode: {e}"
1077
                )
1078
0
            })?;
1079
1080
0
        let Some(doc) = doc else {
  Branch (1080:13): [Folded - Ignored]
  Branch (1080:13): [True: 0, False: 0]
  Branch (1080:13): [Folded - Ignored]
1081
0
            return Ok(None);
1082
        };
1083
1084
0
        let data = match doc.get(DATA_FIELD) {
1085
0
            Some(Bson::Binary(binary)) => Bytes::from(binary.bytes.clone()),
1086
0
            _ => return Ok(None),
1087
        };
1088
1089
0
        let version = if <K as SchedulerStoreKeyProvider>::Versioned::VALUE {
  Branch (1089:26): [Folded - Ignored]
  Branch (1089:26): [True: 0, False: 0]
  Branch (1089:26): [Folded - Ignored]
1090
0
            doc.get_i64(VERSION_FIELD).unwrap_or(0)
1091
        } else {
1092
0
            0
1093
        };
1094
1095
0
        Ok(Some(K::decode(version, data).map_err(|e| {
1096
0
            make_err!(Code::Internal, "Failed to decode in get_and_decode: {e}")
1097
0
        })?))
1098
0
    }
1099
}